【问题标题】:Implementing MERGE INTO sql in pyspark在 pyspark 中实现 MERGE INTO sql
【发布时间】:2019-01-12 11:53:21
【问题描述】:

如何有问题地(pyspark)sql MERGE INTO 语句可以实现。我有两个表,我使用 createOrReplaceTempView 选项将它们放入临时视图中。然后我尝试在这两个临时视图上使用 MERGE INTO 语句。但它失败了。原因可能是 SPARK SQL 不支持 MERGE。有人可以提示如何在 pyspark 中以编程方式实现一个简单的 MERGE INTO SQL 等效语句(如下所示)。

MERGE INTO events
USING updates
ON events.eventId = updates.eventId
WHEN MATCHED THEN
  UPDATE SET
    events.data = updates.data
WHEN NOT MATCHED
  THEN INSERT (date, eventId, data) VALUES (date, eventId, data)

【问题讨论】:

  • 嗨 Kaushik,没有直接的方法可以在 spark 中进行合并。虽然您可以使用案例方法在逻辑上使用普通 sql 来完成。
  • 能否分享一个小例子代码

标签: sql merge pyspark apache-spark-sql


【解决方案1】:

Merge 不直接支持,但是如果我们可以覆盖整个表,那么您可以按照该方法。

hiveContext.sql("select * from events").registerTempTable("temp_events")
hiveContext.sql("select * from updates").registerTempTable("temp_updates")

hiveContext("""
select
case when b.eventId is null then a.date else b.date as date,
case when b.eventId is null then a.eventId else b.eventId end as eventId,
case when b.eventId is null then a.data else b.data as data
from
temp_events a
full outer join
temp_updates b
on a.eventId=b.eventId
""").registerTempTable("FinalData")

hiveContext.sql("INSERT OVERWRITE TABLE table_name select * from FinalData")

使用这种情况,我们确保数据是否在新集合中可用,然后我们将采用这些值,否则我们将采用旧值。

请检查此解决方案是否适合您。

谢谢, 手动

【讨论】:

  • 取决于数据。查询可以通过使用分桶或其他方法进行优化,而使用 Spark,我们可以承担这个风险。我编写了处理 1000 万个数据的代码,它还有多个其他查询。需要 5-7 分钟。
  • 您使用的是哪个版本的 hive 和 Spark?使用 Orc 更新命令有效。因此,如果可行,我们可以更新新记录,我们可以通过 join 找出新记录,然后我们可以合并记录。这可能是一个有效的解决方案。我明天会检查并测试它并更新。
  • 我们使用的是 Spark 2.2.0,我们使用的是 PARQUET 格式的表,而不是 ORC
  • 上述查询中MERGE语句的UPDATE部分是如何处理的?
  • 在插入或更新情况下,b.eventid 不会为空,所以我们处理了这种情况。如果记录在更新中不可用,那么它将为空,在这种情况下,我们将从旧记录中挑选数据。还有一件事,我只是想告知已经有可用的密钥,所以我没有包含哈希函数,否则我们可以包含哈希函数来优化性能。
猜你喜欢
  • 2015-01-07
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-09-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多