【问题标题】:What are the best practices to partition Parquet files by timestamp in Spark?在 Spark 中按时间戳对 Parquet 文件进行分区的最佳实践是什么?
【发布时间】:2015-10-07 00:35:26
【问题描述】:

我对 Spark 还很陌生(2 天),我正在思考对 parquet 文件进行分区的最佳方法。

我的粗略计划是:

  • 使用 com.databricks.spark.csv 读取源 TSV 文件(这些文件具有 TimeStampType 列)
  • 写出 parquet 文件,按年/月/日/小时分区
  • 将这些 parquet 文件用于未来将发生的所有查询

让一个简单的版本工作起来非常容易(对 Spark 开发人员表示敬意)——除了按照我想要的方式进行分区。这是在python BTW:

input = sqlContext.read.format('com.databricks.spark.csv').load(source, schema=myschema)
input.write.partitionBy('type').format("parquet").save(dest, mode="append")

映射 RDD 的最佳方法是为年、月、日、小时添加新列,然后使用 PartitionBy?那么对于任何查询,我们必须手动添加年/月等?鉴于到目前为止我发现 spark 的优雅程度,这似乎有点奇怪。

谢谢

【问题讨论】:

    标签: apache-spark pyspark


    【解决方案1】:

    我现在已经找到了一些方法来做到这一点,还没有对它们进行性能测试,请注意:

    首先我们需要创建一个派生的DataFrame(如下图三种方式),然后写出来。

    1) sql 查询(内联函数)

    sqlContext.registerFunction("day",lambda f: f.day, IntegerType())
    input.registerTempTable("input")
    input_ts = sqlContext.sql(
      "select day(inserted_at) AS inserted_at_day, * from input")
    

    2) sql 查询(非内联) - 非常相似

    def day(ts):
      return f.day
    sqlContext.registerFunction("day",day, IntegerType())
    ... rest as before
    

    3) 带列

    from pyspark.sql.functions import udf
    day = udf(lambda f: f.day, IntegerType())
    input_ts = input.withColumn('inserted_at_day',day(input.inserted_at))
    

    只写:

    input_ts.write.partitionBy(['inserted_at_day']).format("parquet").save(dest, mode="append")
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2015-01-13
      • 1970-01-01
      • 2013-10-27
      • 1970-01-01
      • 2023-03-03
      • 2013-10-15
      • 2019-03-16
      相关资源
      最近更新 更多