使用 spark-sql:
给定输入 1
val df1=spark.sql(""" with t1 (
select 1 c1 , 'US->UK->IN' c2 union all
select 2 c1 , 'UK->IN->CH' c2 )
select c1 id, c2 journey from t1
"""
)
df1.show(false)
df1.createOrReplaceTempView("df1")
+---+----------+
|id |journey |
+---+----------+
|1 |US->UK->IN|
|2 |UK->IN->CH|
+---+----------+
输入 2:
val df2=spark.sql(""" with t1 (
select 'US' c1 , 'United States' c2 union all
select 'IN' c1 , 'India' c2 union all
select 'MY' c1 , 'Malaysia' c2 union all
select 'UK' c1 , 'United kingdom' c2 union all
select 'CH' c1 , 'China' c2
)
select c1 code, c2 country from t1
"""
)
df2.show(false)
df2.createOrReplaceTempView("df2")
+----+--------------+
|code|country |
+----+--------------+
|US |United States |
|IN |India |
|MY |Malaysia |
|UK |United kingdom|
|CH |China |
+----+--------------+
启用交叉连接
spark.sql(" set spark.sql.crossJoin.enabled=true ")
在 spark.sql 中使用poseexplode、collect_list 和array_sort
spark.sql("""
select id, journey, concat_ws('->', array_sort(collect_list((e1,country))).country) result from
(
select id, journey, e1, e2, code, country from
(select id, journey, posexplode(split(journey,"->")) (e1,e2) from df1 ) t1 cross join
(select code, country from df2) t2
where code=e2 ) t3
group by id, journey
order by ID
""")
.show(false)
+---+----------+------------------------------------+
|id |journey |result |
+---+----------+------------------------------------+
|1 |US->UK->IN|United States->United kingdom->India|
|2 |UK->IN->CH|United kingdom->India->China |
+---+----------+------------------------------------+