【问题标题】:Spark-Java: How to change Timestamp format of columns in Dataset<Row>?Spark-Java:如何更改 Dataset<Row> 中列的时间戳格式?
【发布时间】:2019-07-05 02:50:49
【问题描述】:

我想将 Dataset 中具有 2018-08-17T19:58:46.000+0000 之类的值的时间戳字段映射为 2018-08-17 19:58:46.000 之类的格式,即 yyyy-MM-dd HH:mm:ss.SSS,以及一些列到yyyy-MM-dd

例如,我有一个数据集 DS1,其中包含 id、lastModif、created 列:

+------------------+----------------------------+----------------------------+
|Id                |lastModif                   |created                     |
+------------------+----------------------------+----------------------------+
|abc1              |2019-01-14T19:51:55.000+0000|2019-02-07T20:37:53.000+0000|
|AQA2              |2019-02-05T19:26:36.000+0000|2019-02-07T20:40:06.000+0000|
+------------------+----------------------------+----------------------------+ 

从上面的 DS1 我需要将 lastModif 列映射到格式 yyyy-MM-dd HH:mm:ss.SSScreatedTime 列映射到 yyyy-MM-dd
我有类似的 DS2、DS3,但列映射不同。
我保留了一个属性文件,它将从中获取映射列作为键和时间戳格式作为值。
在代码中,我保留了映射列和非映射列的列表,并选择了列:

String cols = "Id,created,lastModif";
String[] colArr = cols.split(",");
String mappedCols = "lastModif,created"; //hardcoding as of now.

List<String> mappedColList = Arrays.asList(mappedCols.split(","));
String nonMappedCols = getNonMappingCols(colArr, mappedCols.split(",")).toLowerCase();
List<String> nonMapped = Arrays.asList(nonMappedCols.split(","));

//column-mapping logic
filtered = tempDS.selectExpr(convertListToSeq(nonMapped),unix_timestamp($"lastModif","yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").as("lastModif"));
filtered.show(false);


public static Seq<String> convertListToSeq(List<String> inputList)
{
    return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
}

private static String getNonMappingCols(String[] cols, String[] mapped)
{
    String nonMappedCols = "";
    List<String> mappedList = Arrays.asList(mapped);

    for(int i=0; i<cols.length; i++)
    {
        if(!mappedList.contains(cols[i])){
            nonMappedCols += cols[i]+",";               
        }
    }
    nonMappedCols = nonMappedCols.substring(0, nonMappedCols.length()-1);

    return nonMappedCols;
}

如何将列映射到所需的时间戳格式?
tempDS.selectExpr(convertListToSeq(nonMapped),unix_timestamp($"lastModif","yyyy-MM-dd HH:mm:ss.SSS").cast("timestamp").as("lastModif")); 代码行中,$"lastModif" 在 Java 中是无法识别的。
其次,这种方式是一种静态方式,即硬编码映射列。如何映射我的 List&lt;String&gt; mappedColList 中的列?

【问题讨论】:

  • 你试过用new Column("lastModif")代替$"lastModif"吗?
  • 是的。它给出编译错误“unix_timestamp(Column,String) undefined”
  • 我试过org.apache.spark.sql.functions.unix_timestamp(tempDS.col("lastModif"),"yyyy-MM-dd HH:mm:ss.SSS")...编译器错误消失了,但我的数据是yyyy-MM-ddTHH:mm:ss.SSS+Z类型的字符串,例如:2019-02-07T20:37:53.000+0000,它被解析为null。

标签: java apache-spark apache-spark-sql


【解决方案1】:
  1. 首先,让我们澄清您的输入数据。您提到,您有 Timestamp,但您列出的输出格式似乎只是 String 值,表示以下格式 yyyy-MM-dd'T'HH :mm:ss.SSSZ。你能证实这个结论吗?

  2. 在您的一个 cmets 中,您回答函数 unix_timestamp 在您的尝试中返回 null。查看unix_timestamp(Column s, String p) 的文档,我们可以看到它需要其他格式来解析,否则它会返回null

参数:
s - 日期、时间戳或字符串。如果是字符串,则数据必须采用可以转换为时间戳的格式,例如 yyyy-MM-ddyyyy-MM-dd HH:mm:ss.SSSS
fmt - 当 s 为字符串时,详细说明 s 格式的日期时间模式
退货:
时间戳,如果 s 是无法转换为时间戳的字符串或 fmt 是无效格式,则为 null

  1. 如果你输入的参数真的是String,我建议你下面的解决方案,使用Spark SQL函数to_timestamp(Column s, String fmt)date_format(Column dateExpr, String format)
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.to_timestamp;
import static org.apache.spark.sql.functions.date_format;
....
SparkSession spark = SparkSession
            .builder()
            .appName("datetime-transformation")
            .master("local[*]")
            .getOrCreate();

SomeDto someDto = SomeDto.builder()
            .id("abc1")
            .lastModif("2019-01-14T19:51:55.123+02:00")
            .created("2019-01-14T19:51:55.123+02:00")
            .build();

Dataset<Row> ds = spark.createDataset(Collections.singletonList(someDto), Encoders.bean(SomeDto.class)).toDF();

ds.printSchema();
ds.show(false);

Dataset<Row> dfm = ds
            .withColumn("lastModif", to_timestamp(col("lastModif"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"))
            .withColumn("created", date_format(to_timestamp(col("lastModif"), "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), "yyyy-MM-dd"));

dfm.printSchema();
dfm.show(false);

输出将是:

root
 |-- created: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lastModif: string (nullable = true)

+-----------------------------+----+-----------------------------+
|created                      |id  |lastModif                    |
+-----------------------------+----+-----------------------------+
|2019-01-14T19:51:55.123+02:00|abc1|2019-01-14T19:51:55.123+02:00|
+-----------------------------+----+-----------------------------+

root
 |-- created: string (nullable = true)
 |-- id: string (nullable = true)
 |-- lastModif: timestamp (nullable = true)

+----------+----+-----------------------+
|created   |id  |lastModif              |
+----------+----+-----------------------+
|2019-01-14|abc1|2019-01-14 19:51:55.123|
+----------+----+-----------------------+

【讨论】:

    【解决方案2】:

    这就是我使映射动态化的方式:

    private static Dataset<Row> mapColumns(Properties mappings, String tableNm, String[] colArr, Dataset<Row> tempDS) throws Exception
    {
        String mappedCols = "lastmodif,createdDate,endDate";
        Dataset<Row> filtered = null;
        Properties mappingCols = mappings;
        List<String> mapped = Arrays.asList(mappedCols.split(","));
    
        List<String> colsList = Arrays.asList(colArr);
        ArrayList<String> tempList = new ArrayList<String>();
        Iterator itrTmp = colsList.iterator();
        while(itrTmp.hasNext()){
            tempList.add((String)itrTmp.next());
        }
    
        Iterator itr = mapped.iterator();
        filtered = tempDS.selectExpr(convertListToSeq(colsList));
    
        while(itr.hasNext()){
            String column = itr.next().toString();
            String newCol = column+"_mapped";
            String propertyKey = tableNm+"-"+column;
            String propertyValue = mappingCols.getProperty(propertyKey);
    
            filtered = filtered.selectExpr(convertListToSeq(colsList))
                    .withColumn(newCol, functions.regexp_replace(functions.substring(filtered.col(column), 0, 23),"T", " ")).alias(newCol)
                    .drop(filtered.col(column));
    
            tempList.remove(column);
            tempList.add(newCol);
            colsList = tempList;
        }
    
        filtered = filtered.selectExpr(convertListToSeq(colsList)); 
        filtered.show(false);
    }
    
    public static Seq<String> convertListToSeq(List<String> inputList)
    {
        return JavaConverters.asScalaIteratorConverter(inputList.iterator()).asScala().toSeq();
    }
    

    StringTimestamp 的转换仍在等待中。到目前为止,我正在做一个substring,但是这个逻辑适用于所有数据类型为yyyy-mm-ddThh:mm:ss.SSSZyyyy-mm-ddThh:mm:ss.SSS+0000 等的列,但如果列的数据类型为yyyy-mm-dd 并且代码将中断,则该逻辑将不起作用. 我在这里提出了这个问题:how to convert string to timestamp

    【讨论】:

      猜你喜欢
      • 2018-11-27
      • 2020-05-11
      • 2020-10-21
      • 1970-01-01
      • 1970-01-01
      • 2017-11-17
      • 1970-01-01
      • 1970-01-01
      • 2014-09-11
      相关资源
      最近更新 更多