【问题标题】:Mapping Spark DataSet row values into new hash column将 Spark DataSet 行值映射到新的哈希列
【发布时间】:2018-04-19 04:24:24
【问题描述】:

给定以下DataSet 值为inputData

column0 column1 column2 column3
A       88      text    99
Z       12      test    200
T       120     foo     12

在 Spark 中,计算新的hash 列并将其附加到新的DataSethashedData 的有效方法是什么,其中hash 定义为在每一行上应用MurmurHash3 inputData 的值。

具体来说,hashedData 为:

column0 column1 column2 column3 hash
A       88      text    99      MurmurHash3.arrayHash(Array("A", 88, "text", 99))
Z       12      test    200     MurmurHash3.arrayHash(Array("Z", 12, "test", 200))
T       120     foo     12      MurmurHash3.arrayHash(Array("T", 120, "foo", 12))

如果需要更多细节,请告诉我。

感谢任何帮助。谢谢!

【问题讨论】:

    标签: scala apache-spark spark-dataframe apache-spark-dataset


    【解决方案1】:

    一种方法是使用withColumn函数:

    import org.apache.spark.sql.functions.{col, hash}
    dataset.withColumn("hash", hash(dataset.columns.map(col):_*))
    

    【讨论】:

    • 谢谢!但我认为该行正在将列字符串名称传递给MurmurHash3(即Array("column0", "column1", "column2", "column3"))。我将尝试找到一种方法来提取映射函数上下文中的实际行值。
    • @JesúsZazueta 只是说我不认为他的解决方案只做列名。此外,还有一个简洁的函数可以获取多个列并使用它们的内容生成一个新列:df.withColumn("concat", concat(df.columns.map(col):_*)) 他们也有一些其他方法,例如specifying the join separator
    【解决方案2】:

    事实证明,Spark 已经将其实现为包 org.apache.spark.sql.functions 中的 hash 函数

    /**
     * Calculates the hash code of given columns, and returns the result as an int column.
     *
     * @group misc_funcs
     * @since 2.0
     */
    @scala.annotation.varargs
    def hash(cols: Column*): Column = withExpr {
      new Murmur3Hash(cols.map(_.expr))
    }
    

    在我的例子中,应用为:

    import org.apache.spark.sql.functions.{col, hash}
    
    val newDs = typedRows.withColumn("hash", hash(typedRows.columns.map(col): _*))
    

    关于 Spark sql,我真的有很多东西要学 :(.

    把它留在这里以防其他人需要它。谢谢!

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-01
      • 1970-01-01
      • 2012-08-05
      相关资源
      最近更新 更多