【问题标题】:How to compare 2 files in spark using scala with the same schema如何使用具有相同架构的scala比较spark中的2个文件
【发布时间】:2019-09-25 01:02:54
【问题描述】:

所以我有这 2 个文件(获取 hive 表的 hdfs 文件位置),带有历史记录的 file1 和带有当天记录的 file2。它们都具有相同的架构。现在我想做一个 CDC 进程来比较两个文件后获取更新/新插入的记录。 多列中可能有更改,因此我们希望一次提取所有更改的列。 假设这些列是:- Customer_ID 、名称、地址、国家。 现在 Customer_ID 是主键,而其余 3 列可能会更改。

文件 1

12343| John| Rear exit market| SanFrancisco
45656| Bobs| Knewbound Road PD| Seattle
54345| Fersi| Dallas Road Pnth| Newyork
86575| Persa| Roman Building Path| Kirkland
64565| Camy| Olympus Ground 3rd| NewJersey

文件 2

12343| John| World Centre Phase| SanFrancisco
54345| Posi| Dallas Road Pnth| Newyork

我希望最终结果看起来像:-

12343|Rear exit market| World Centre Phase
54345| Fersi| Posi

所以我想要主键,更改之前的先前记录,在我的最终答案中更新的新更新记录。

【问题讨论】:

  • 我们可以看看你的代码示例吗?
  • 我这里没有,不能从我办公室的工作站复制,因为这是不允许的。你能建议如何前进
  • 为什么一条记录 (12343) 从第 3 列取值,而另一条从第 2 列取值?您能否根据您提供的数据重新输入您迄今为止尝试过的内容?到目前为止,它看起来像一个 3 行(将两个文件加载到两个数据集中并加入它们)所以不应该输入太多

标签: scala apache-spark dataframe


【解决方案1】:

这是一个可能的解决方案。正如我在评论中提到的,它几乎是 3 或 4 班轮,但我提供了一些替代方案。

// Load the data into 2 dataframes
val df1 = spark.read.option("sep","|").csv("file1a.txt")
val df2 = spark.read.option("sep","|").csv("file2a.txt")

// Next join the two dataframes using an INNER JOIN on the key as follows:
val joined = df1.joinWith(df2, df1.col("_c0") === df2.col("_c0"))

文件中没有标题信息,因此列将获得默认名称。连接的模式基本上是一个 Tuple2,每个 Tuple2 包含连接每一侧的列列表。

以下是架构:

scala> df1.printSchema
root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)


scala> joined.printSchema
root
 |-- _1: struct (nullable = false)
 |    |-- _c0: string (nullable = true)
 |    |-- _c1: string (nullable = true)
 |    |-- _c2: string (nullable = true)
 |    |-- _c3: string (nullable = true)
 |-- _2: struct (nullable = false)
 |    |-- _c0: string (nullable = true)
 |    |-- _c1: string (nullable = true)
 |    |-- _c2: string (nullable = true)
 |    |-- _c3: string (nullable = true)

最后一步(我认为)是对您想要生产的产品的推断。我认为您想显示哪些列具有不同的值。恕我直言,您显示的输出格式有几个潜在的问题。我认为您只是想在两个输出列中显示一个不同的值。恕我直言,这有几个挑战:

  1. 如果记录中有两列具有不同的值 - 因此您需要为每个输出记录显示 4 个(或更多)值?
  2. 如果有很多不同的记录,将很难找到不同的原始列(因为在输出中丢失了任何有差异的列的标识) - 这将更加困难具有大量列的记录。
  3. 当您随机排列结果集中的列时,解决方案可能会更加复杂。

以下输出格式通过显示所有列以及指示哪些列具有不同值来解决上述问题。该指标是关键,因为它使差异更容易发现。 这是“蛮力”方法,其中列出了每一列并手动确定每个差异。

joined.select($"_1._C0".as("id"), $"_1._c1", $"_2._c1", when(col("_1._c1") === col("_2._c1"), "").otherwise("ne").as("c1 Ind"),
  $"_1._c2", $"_2._c2", when(col("_1._c2") === col("_2._c2"), "").otherwise("ne").as("c2 Ind"),
  $"_1._c3", $"_2._c3", when(col("_1._c3") === col("_2._c3"), "").otherwise("ne").as("c3 Ind")).show(false)

产生:

+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
|id   |_c1  |_c1 |c1 Ind|_c2             |_c2               |c2 Ind|_c3         |_c3         |c3 Ind|
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
|12343|John |John|      |Rear exit market|World Centre Phase|ne    |SanFrancisco|SanFrancisco|      |
|54345|Fersi|Posi|ne    |Dallas Road Pnth|Dallas Road Pnth  |      |Newyork     |Newyork     |      |
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+

蛮力方法繁琐且难以输入 - 特别是对于较大的结果集。所以我们可以使用一些 Scala 魔法来使它更优雅一点。

// Define a helper function that takes a column name and returns the three parts needed
// to generate the output for that column. i.e. select the column from the two sides of the joined result set
// and generate the case statement to generate the "ne" indicator if the two values
// are unequal.
def genComp(colName:String) = List(s"_1.$colName", s"_2.$colName", s"case when _1.$colName = _2.$colName then '' else 'ne' end as ${colName}_ind")

// Run the query to produce the results:
joined.selectExpr(
    (List("_1._C0 as id") ++ genComp("_c1") ++ genComp("_c2") ++ genComp("_c3")) : _*
  ).show(false)

运行时,这会产生与“蛮力”方法相同的结果。

这是如何工作的?那么神奇的是第二行和 selectExpr 方法的一个特性。

selectExpr 方法具有以下签名:def selectExpr(exprs: String*): org.apache.spark.sql.DataFrame。这意味着它可以接受可变数量的字符串参数。

为了生成传递给 selectExpr 的参数,我使用了这个构造 List (strings) : _*。这是 Scala 的“魔法”,它接受一个字符串列表并将其转换为可变数量的参数参数列表。

剩下的就很简单了。基本上,genComp 函数返回一个字符串列表,这些字符串标识连接 DataFrame 每一侧的列以及不等式指示符生成逻辑。将它们连接在一起。结果被转换为传递给 selectExpr 的参数列表,最终运行与“蛮力”方法相同的查询。

这里有一个有趣的想法,我将留给您作为练习:使用 df1 的模式生成列列表以使用 genComp 输出(而不是像我展示的那样简单地手动连接它们)。

这是一个提示:

val cols = df1.schema.filter(c => c.name != "_c0").map(c => List(c.name)).flatten
cols.foreach(println)

【讨论】:

  • 如果这对您的问题有所帮助,请您点击答案旁边的灰色复选标记接受它。如果它对您没有帮助,请随时补充原始问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-02-19
  • 2018-06-26
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多