【问题标题】:sparklyr - Including null values in an Apache Spark Joinsparklyr - 在 Apache Spark Join 中包含空值
【发布时间】:2019-05-29 14:44:41
【问题描述】:

Including null values in an Apache Spark Join 的问题有 Scala、PySpark 和 SparkR 的答案,但没有 sparklyr 的答案。我一直无法弄清楚如何让 sparklyr 中的inner_join 将连接列中的空值视为相等。有谁知道如何在 sparklyr 中做到这一点?

【问题讨论】:

    标签: r apache-spark join dplyr sparklyr


    【解决方案1】:

    您可以调用隐式交叉连接:

    #' Return a Cartesian product of Spark tables
    #'
    #' @param df1 tbl_spark
    #' @param df2 tbl_spark
    #' @param explicit logical If TRUE use crossJoin otherwise 
    #'   join without expression
    #' @param suffix character suffixes to be used on duplicate names
    cross_join <- function(df1, df2, 
        explicit = FALSE, suffix = c("_x", "_y")) {
    
      common_cols <- intersect(colnames(df1), colnames(df2))
    
      if(length(common_cols) > 0) {
        df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
        df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
      }
    
      sparklyr::invoke(
        spark_dataframe(df1), 
        if(explicit) "crossJoin" else "join", 
        spark_dataframe(df2)) %>% sdf_register()
    }
    

    并使用IS NOT DISTINCT FROM过滤结果

    # Enable Cross joins
    sc %>% 
      spark_session() %>% 
      sparklyr::invoke("conf") %>%
      sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")
    
    df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
    df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))
    
    df1 %>%
      cross_join(df2) %>% 
      filter(id1 %IS NOT DISTINCT FROM% id2)
    
    # Source: spark<?> [?? x 4]
      id1   val_x id2   val_y
    * <chr> <int> <chr> <int>
    1 NA        1 NA        4
    2 foo       2 foo       5
    

    optimized execution plan:

    <jobj[62]>
      org.apache.spark.sql.catalyst.plans.logical.Join
      Join Inner, (id1#10 <=> id2#76)
    :- Project [id1#10, val#11 AS val_x#129]
    :  +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
    :        +- Scan ExistingRDD[id1#10,val#11]
    +- Project [id2#76, val#77 AS val_y#132]
       +- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
             +- Scan ExistingRDD[id2#76,val#77]
    

    &lt;=&gt; 运算符应该以同样的方式工作:

    df1 %>%
      cross_join(df2) %>% 
      filter(id1 %<=>% id2)
    

    请注意:

    • 如果后面没有选择将结果提升为散列连接/排序合并连接或交叉连接is enabled 的选择,则隐式交叉连接将失败。
    • 在这种情况下不应使用显式交叉连接,因为它将优先于后续选择。
    • 可以使用dplyr风格的交叉连接:

      mutate(df1, `_const` = TRUE) %>%  
        inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>% 
        select(-`_const`) %>% 
        filter(id1 %IS NOT DISTINCT FROM% id2)
      

      但我建议不要这样做,因为它不太健壮(取决于上下文优化器可能无法识别 _const 是恒定的)。

    【讨论】:

      猜你喜欢
      • 2017-06-03
      • 2021-11-14
      • 1970-01-01
      • 2021-05-18
      • 2017-02-25
      • 2021-12-27
      • 1970-01-01
      • 2017-07-30
      • 1970-01-01
      相关资源
      最近更新 更多