【问题标题】:Pyspark to Spark-scala conversionPyspark 到 Spark-scala 的转换
【发布时间】:2021-01-31 06:01:04
【问题描述】:

各位开发者,

我正在创建动态固定长度文件读取功能 - 其中架构将来自 JSON 文件: 我的代码语言是:scala,因为大多数现有代码已经用 scala 编写。

在浏览时,我找到了我需要的确切代码,用 pyspark 编写。能否请您帮助如何将其转换为相应的 Spark-scala 代码。特别是字典部分和循环部分

主要参考:Read fixed width file using schema from json file in pyspark

SchemaFile.json
===========================
{"Column":"id","From":"1","To":"3"}
{"Column":"date","From":"4","To":"8"}
{"Column":"name","From":"12","To":"3"}
{"Column":"salary","From":"15","To":"5"}

File = spark.read\
    .format("csv")\
    .option("header","false")\
    .load("C:\Temp\samplefile.txt")

SchemaFile = spark.read\
    .format("json")\
    .option("header","true")\
    .json('C:\Temp\schemaFile\schema.json')
    
sfDict = map(lambda x: x.asDict(), SchemaFile.collect())
print(sfDict)
#[{'Column': u'id', 'From': u'1', 'To': u'3'},
# {'Column': u'date', 'From': u'4', 'To': u'8'},
# {'Column': u'name', 'From': u'12', 'To': u'3'},
# {'Column': u'salary', 'From': u'15', 'To': u'5'}

from pyspark.sql.functions import substring
File.select(
    *[
        substring(
            str='_c0',
            pos=int(row['From']),
            len=int(row['To'])
        ).alias(row['Column']) 
        for row in sfDict
    ]
).show()

【问题讨论】:

    标签: scala apache-spark pyspark apache-spark-sql fixed-width


    【解决方案1】:

    检查下面的代码。

    scala> df.show(false)
    +--------------------+
    |value               |
    +--------------------+
    |00120181120xyz12341 |
    |00220180203abc56792 |
    |00320181203pqr25483 |
    +--------------------+
    
    scala> schema.show(false)
    +------+----+---+
    |Column|From|To |
    +------+----+---+
    |id    |1   |3  |
    |date  |4   |8  |
    |name  |12  |3  |
    |salary|15  |5  |
    +------+----+---+
    
    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    val columns = schema
    .withColumn("id",lit(1))
    .groupBy($"id")
    .agg(collect_list(concat(lit("substring(value,"),$"from",lit(","),$"to",lit(") as "),$"column")).as("data"))
    .withColumn("data",explode($"data"))
    .select($"data")
    .map(_.getAs[String](0))
    .collect
    
    // Exiting paste mode, now interpreting.
    
    columns: Array[String] = Array(substring(value,1,3) as id, substring(value,4,8) as date, substring(value,12,3) as name, substring(value,15,5) as salary)
    
    scala> df.selectExpr(columns:_*).show(false)
    +---+--------+----+------+
    |id |date    |name|salary|
    +---+--------+----+------+
    |001|20181120|xyz |12341 |
    |002|20180203|abc |56792 |
    |003|20181203|pqr |25483 |
    +---+--------+----+------+
    
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-01-20
      • 2018-11-16
      • 1970-01-01
      • 1970-01-01
      • 2018-04-03
      • 2017-02-27
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多