【发布时间】:2016-06-29 20:05:33
【问题描述】:
我正在尝试将一个巨大的 RDD 的 kafka 消息转换为 parquet 格式,并使用 spark 流保存在 HDFS 中。它是一条系统日志消息,例如每行中的 name1=value1|name2=value2|name3=value3,关于如何在 spark 流中实现这一点的任何指针?
【问题讨论】:
标签: apache-kafka spark-streaming parquet
我正在尝试将一个巨大的 RDD 的 kafka 消息转换为 parquet 格式,并使用 spark 流保存在 HDFS 中。它是一条系统日志消息,例如每行中的 name1=value1|name2=value2|name3=value3,关于如何在 spark 流中实现这一点的任何指针?
【问题讨论】:
标签: apache-kafka spark-streaming parquet
只要您有 avro 架构,您就可以将 RDD 保存到 parquet 而不转换为 DataFrame
这是一个示例函数:
public <T> void save(JavaRDD<T> rdd, Class<T> clazz, Time timeStamp, Schema schema, String path) throws IOException {
Job job = Job.getInstance();
ParquetOutputFormat.setWriteSupportClass(job, AvroWriteSupport.class);
AvroParquetOutputFormat.setSchema(job, schema);
LazyOutputFormat.setOutputFormatClass(job, new ParquetOutputFormat<T>().getClass());
job.getConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false");
job.getConfiguration().set("parquet.enable.summary-metadata", "false");
//save the file
rdd.mapToPair(me -> new Tuple2(null, me))
.saveAsNewAPIHadoopFile(
String.format("%s/%s", path, timeStamp.milliseconds()),
Void.class,
clazz,
LazyOutputFormat.class,
job.getConfiguration());
}
【讨论】:
【讨论】: