【问题标题】:Spark/Java: Dataframe String column to StructSpark/Java:数据框字符串列到结构
【发布时间】:2019-08-05 05:35:20
【问题描述】:

我有一个这样的数据集:

+---+-------------------+-----------------------+
|id |time               |range                  |
+---+-------------------+-----------------------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|
+---+-------------------+-----------------------+

使用架构

root
 |-- id: string (nullable = true)
 |-- time: string (nullable = true)
 |-- range: string (nullable = true)

我想过滤时间列中小时/分钟在范围列中的小时/分钟之间的行。

+---+-------------------+-----------------------+-----------+
|id |time               |range                  |between    |
+---+-------------------+-----------------------+-----------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|true       |
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|false      |
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|false      |
+---+-------------------+-----------------------+-----------+

我知道在 Scala 中我必须将范围列转换为类似

array(named_struct("start", "00h00", "end", "03h00"), named_struct("start", "15h30", "end", "17h30"), named_struct("start", "21h00", "end", "23h59"))

但是我还没有找到用 Java 实现它的方法。我该怎么做,或者有更好的解决方案吗?

谢谢。

【问题讨论】:

    标签: java apache-spark dataset


    【解决方案1】:

    您可以这样做的一种方法是:

    1. 使用 Spark 的静态函数标准化您的时间。
    2. 使用 UDF(用户定义函数)检查您的值是否在范围内

    使用静态函数:

    df = df
        .withColumn(
            "date",
            date_format(col("time"), "yyyy-MM-dd HH:mm:ss.SSSS"))
        .withColumn("h", hour(col("date")))
        .withColumn("m", minute(col("date")))
        .withColumn("s", second(col("date")))
        .withColumn("event", expr("h*3600 + m*60 +s"))
        .drop("date")
        .drop("h")
        .drop("m")
        .drop("s");
    

    如果您的数据框看起来像以前:

    +---+-------------------+-----------------------+
    |id |time               |range                  |
    +---+-------------------+-----------------------+
    |id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|
    |id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|
    |id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|
    +---+-------------------+-----------------------+
    

    之后,它应该是这样的:

    +---+-------------------+-----------------------+-----+
    |id |time               |range                  |event|
    +---+-------------------+-----------------------+-----+
    |id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|18000|
    |id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|32400|
    |id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|37800|
    +---+-------------------+-----------------------+-----+
    

    使用 UDF:

    df = df.withColumn("between",
        callUDF("inRange", col("range"), col("event")));
    

    结果将是:

    +---+-------------------+-----------------------+-----+-------+
    |id |time               |range                  |event|between|
    +---+-------------------+-----------------------+-----+-------+
    |id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|18000|true   |
    |id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|32400|false  |
    |id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|37800|false  |
    +---+-------------------+-----------------------+-----+-------+
    

    InRangeUdf

    您的 UDF 如下所示:

    package net.jgp.books.sparkInAction.ch14.lab900_in_range;
    
    import org.apache.spark.sql.api.java.UDF2;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class InRangeUdf implements UDF2<String, Integer, Boolean> {
      private static Logger log = LoggerFactory
          .getLogger(InRangeUdf.class);
    
      private static final long serialVersionUID = -21621751L;
    
      @Override
      public Boolean call(String range, Integer event) throws Exception {
        log.debug("-> call({}, {})", range, event);
        String[] ranges = range.split(";");
        for (int i = 0; i < ranges.length; i++) {
          log.debug("Processing range #{}: {}", i, ranges[i]);
          String[] hours = ranges[i].split("-");
          int start =
              Integer.valueOf(hours[0].substring(0, 2)) * 3600 +
                  Integer.valueOf(hours[0].substring(3)) * 60;
          int end =
              Integer.valueOf(hours[1].substring(0, 2)) * 3600 +
                  Integer.valueOf(hours[1].substring(3)) * 60;
          log.debug("Checking between {} and {}", start, end);
          if (event >= start && event <= end) {
            return true;
          }
        }
        return false;
      }
    
    }
    

    驱动代码

    您的驱动程序代码如下所示:

    package net.jgp.books.sparkInAction.ch14.lab900_in_range;
    
    import static org.apache.spark.sql.functions.*;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    /**
     * Custom UDF to check if in range.
     * 
     * @author jgp
     */
    public class InCustomRangeApp {
    
      /**
       * main() is your entry point to the application.
       * 
       * @param args
       */
      public static void main(String[] args) {
        InCustomRangeApp app = new InCustomRangeApp();
        app.start();
      }
    
      /**
       * The processing code.
       */
      private void start() {
        // Creates a session on a local master
        SparkSession spark = SparkSession.builder()
            .appName("Custom UDF to check if in range")
            .master("local[*]")
            .getOrCreate();
        spark.udf().register(
            "inRange",
            new InRangeUdf(),
            DataTypes.BooleanType);
    
        Dataset<Row> df = createDataframe(spark);
        df.show(false);
    
        df = df
            .withColumn(
                "date",
                date_format(col("time"), "yyyy-MM-dd HH:mm:ss.SSSS"))
            .withColumn("h", hour(col("date")))
            .withColumn("m", minute(col("date")))
            .withColumn("s", second(col("date")))
            .withColumn("event", expr("h*3600 + m*60 +s"))
            .drop("date")
            .drop("h")
            .drop("m")
            .drop("s");
        df.show(false);
    
        df = df.withColumn("between",
            callUDF("inRange", col("range"), col("event")));
        df.show(false);
      }
    
      private static Dataset<Row> createDataframe(SparkSession spark) {
        StructType schema = DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField(
                "id",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "time",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "range",
                DataTypes.StringType,
                false) });
    
        List<Row> rows = new ArrayList<>();
        rows.add(RowFactory.create("id1", "2019-03-11 05:00:00",
            "00h00-07h30;23h30-23h59"));
        rows.add(RowFactory.create("id2", "2019-03-11 09:00:00",
            "00h00-07h30;23h30-23h59"));
        rows.add(RowFactory.create("id3", "2019-03-11 10:30:00",
            "00h00-07h30;23h30-23h59"));
    
        return spark.createDataFrame(rows, schema);
      }
    }
    

    【讨论】:

      猜你喜欢
      • 2017-11-03
      • 2014-09-28
      • 1970-01-01
      • 2016-10-11
      • 2018-12-29
      • 2016-08-30
      • 1970-01-01
      • 2020-02-29
      • 2021-09-28
      相关资源
      最近更新 更多