【问题标题】:Replace multiple value for String column替换字符串列的多个值
【发布时间】:2021-03-28 01:41:55
【问题描述】:

我是新来的火花并努力做到这一点。 我在两个数据框中输入如下 (Journey,country_code mappinng) 并且需要生成另一个数据框,如预期结果。

旅程:-

ID journey
1 US->UK->IN
2 UK->IN->CH

国家代码映射:-

Code Country
US United States
IN India
MY Malaysia
UK United kingdom
CH China

**预期输出:- **

ID journey Journey_LongName
1 US->UK->IN United States->United kingdom->India
2 UK->IN->CH United kingdom->India->China

国家/地区映射是动态的,旅程的顺序不应在 Journey_LongName 字段中更改。如果你们中的任何人解决了这个问题或有想法,请分享您的意见。 谢谢 禅那

【问题讨论】:

  • 火花版???

标签: apache-spark pyspark apache-spark-sql


【解决方案1】:

这毕竟只是一个连接,但棘手的部分是保持顺序。我使用transform 来保留索引,以便保持顺序。最后再次使用transform 丢弃索引。

这里的数据帧称为journeycountry。结果称为df

df = journey.selectExpr(
    'ID', 'journey',
    """explode(
          transform(
              split(journey, '->'),
              (x, i)-> struct(x as Code, i as index)
          )
      ) as countries"""
).select(
    'ID', 'journey', 'countries.*'
).join(
    country,
    'Code'
).drop('Code').groupBy('ID').agg(
    F.first('journey').alias('journey'), 
    F.array_sort(
        F.collect_list(
            F.array('index', 'Country')
        )
    ).alias('journey_long')
).selectExpr(
    'ID', 'journey',
    """concat_ws(
           '->',
           transform(journey_long, x -> x[1])
       ) as journey_long"""
)

df.show(20,0)
+---+----------+------------------------------------+
|ID |journey   |journey_long                        |
+---+----------+------------------------------------+
|1  |US->UK->IN|United States->United kingdom->India|
|2  |UK->IN->CH|United kingdom->India->China        |
+---+----------+------------------------------------+

【讨论】:

    【解决方案2】:

    使用 spark-sql:

    给定输入 1

    val df1=spark.sql(""" with t1 (
    select 1 c1 , 'US->UK->IN'  c2 union all
    select 2 c1 , 'UK->IN->CH' c2 )
    select c1 id, c2 journey from t1 
    """
    )
    df1.show(false)
    df1.createOrReplaceTempView("df1")
    
    +---+----------+
    |id |journey   |
    +---+----------+
    |1  |US->UK->IN|
    |2  |UK->IN->CH|
    +---+----------+
    

    输入 2:

    val df2=spark.sql(""" with t1 (
    select 'US' c1 , 'United States'  c2 union all
    select 'IN' c1 , 'India'  c2 union all
    select 'MY' c1 , 'Malaysia'  c2 union all
    select 'UK' c1 , 'United kingdom'  c2 union all
    select 'CH' c1 , 'China'  c2
    )
    select c1 code, c2 country from t1 
    """
    )
    df2.show(false)
    df2.createOrReplaceTempView("df2")
    
    +----+--------------+
    |code|country       |
    +----+--------------+
    |US  |United States |
    |IN  |India         |
    |MY  |Malaysia      |
    |UK  |United kingdom|
    |CH  |China         |
    +----+--------------+
    

    启用交叉连接

    spark.sql(" set spark.sql.crossJoin.enabled=true ")
    

    在 spark.sql 中使用poseexplode、collect_list 和array_sort

    spark.sql("""
    select id, journey, concat_ws('->', array_sort(collect_list((e1,country))).country) result from
    (
    select id, journey, e1, e2, code, country  from 
    (select id, journey, posexplode(split(journey,"->")) (e1,e2) from df1  ) t1 cross join 
    (select code, country from df2) t2 
    where code=e2 ) t3
    group by id, journey
    order by ID
    """)
    .show(false)
    
    +---+----------+------------------------------------+
    |id |journey   |result                              |
    +---+----------+------------------------------------+
    |1  |US->UK->IN|United States->United kingdom->India|
    |2  |UK->IN->CH|United kingdom->India->China        |
    +---+----------+------------------------------------+
    

    【讨论】:

    【解决方案3】:

    另一种使用嵌套 transform() 高阶函数的解决方案,可在 spark 2.4 及更高版本中使用

    df1.show(false)
    df1.createOrReplaceTempView("df1")
    
    +---+----------+
    |id |journey   |
    +---+----------+
    |1  |US->UK->IN|
    |2  |UK->IN->CH|
    +---+----------+
    
    df2.show(false)
    df2.createOrReplaceTempView("df2")
    
    +----+--------------+
    |code|country       |
    +----+--------------+
    |US  |United States |
    |IN  |India         |
    |MY  |Malaysia      |
    |UK  |United kingdom|
    |CH  |China         |
    +----+--------------+
    
    
    spark.sql("""
    select id, journey,  concat_ws('->',flatten(transform( split(journey,"->") ,(x,i) -> transform(s1, (y,j) -> (y[x]) )  )))  result from df1, 
    ( select collect_list(map(code,country)) s1 from df2 )
    """)
    .show(false)
    
    +---+----------+------------------------------------+
    |id |journey   |result                              |
    +---+----------+------------------------------------+
    |1  |US->UK->IN|United States->United kingdom->India|
    |2  |UK->IN->CH|United kingdom->India->China        |
    +---+----------+------------------------------------+
    

    可以通过移动 collect_list 来转换输入来进一步缩短查询

    spark.sql("""
    select id, journey, concat_ws('->',flatten(transform( split(journey,"->") ,(x,i) -> transform( (select collect_list(map(code,country)) s1 from df2 ), (y,j) -> (y[x]) )  )))  result from df1 
    """)
    .show(false)
    
    +---+----------+------------------------------------+
    |id |journey   |result                              |
    +---+----------+------------------------------------+
    |1  |US->UK->IN|United States->United kingdom->India|
    |2  |UK->IN->CH|United kingdom->India->China        |
    +---+----------+------------------------------------+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-09-02
      • 1970-01-01
      • 2020-12-04
      • 2020-01-27
      • 2020-08-31
      • 2015-03-15
      • 2018-05-30
      • 2017-11-02
      相关资源
      最近更新 更多