【问题标题】:Retrieve Spark Dataset with Distinct values in a Column检索列中具有不同值的 Spark 数据集
【发布时间】:2019-05-13 17:02:54
【问题描述】:

我已经从一个 csv 文件创建了一个 Spark 数据集。

架构是:

 |-- FirstName: string (nullable = true)<br>
 |-- LastName: string (nullable = true)<br>
 |-- Email: string (nullable = true)<br>
 |-- Phone: string (nullable = true)

我正在对电子邮件字段执行重复数据删除:

Dataset<Row> customer=  spark.read().option("header","true").option("charset","UTF8")
                    .option("delimiter",",").csv(path);

Dataset<Row> distinct =  customer.select(col).distinct();

我想创建一个输出 csv 文件,其中包含具有不同电子邮件 ID 的行。

如何查询以检索具有不同电子邮件记录的数据集?

示例输入:

John David john.david@abc.com 2222
John Smith john.smith@abc.com 4444
John D john.david@abc.com 2222

样本输出:

John David john.david@abc.com 2222
John Smith john.smith@abc.com 4444

提前致谢

【问题讨论】:

  • 这两个名字应该怎么选?任意?电子邮件也是关键吗?
  • 现在我将保留第一条记录。现在我可以编写一个带有不同电子邮件列的 csv 文件,但我想编写整个 row.distinct.coalesce(1).write().format("com.databricks.spark.csv").option ("header","true").option("delimiter","\t").save(outPath+"outputs.csv");

标签: apache-spark apache-spark-sql apache-spark-dataset


【解决方案1】:

这是使用窗口函数的一种方法。

import org.apache.spark.sql.functions.row_number
import org.apache.spark.sql.expressions.Window

val df = Seq(
("John", "David", "john.david@abc.com", 2222),
("John", "Smith", "john.smith@abc.com", 4444),
("John", "D", "john.david@abc.com", 2222)       
).toDF("FirstName", "LastName", "Email", "Phone")

val w = Window.partitionBy($"Email").orderBy($"Phone")

df.withColumn("row", row_number.over(w))
              .where($"row" === 1)
              .drop("row")
              .show(false)

代码将通过电子邮件进行分区,然后将返回每个分区的第一行。

输出:

+---------+--------+------------------+-----+
|FirstName|LastName|Email             |Phone|
+---------+--------+------------------+-----+
|John     |Smith   |john.smith@abc.com|4444 |
|John     |David   |john.david@abc.com|2222 |
+---------+--------+------------------+-----+

【讨论】:

  • 非常感谢。我正在尝试这段代码,Java 中不推荐使用 rownumber.over 函数吗?
  • 欢迎,这里是一个window java例子stackoverflow.com/questions/33319279/…
  • 这样您就可以使用Column rowNum = functions.row_number().over(w); 导入org.apache.spark.sql.functions 来访问它
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-11-26
  • 1970-01-01
  • 1970-01-01
  • 2015-01-10
  • 1970-01-01
相关资源
最近更新 更多