这是一个可能的解决方案。正如我在评论中提到的,它几乎是 3 或 4 班轮,但我提供了一些替代方案。
// Load the data into 2 dataframes
val df1 = spark.read.option("sep","|").csv("file1a.txt")
val df2 = spark.read.option("sep","|").csv("file2a.txt")
// Next join the two dataframes using an INNER JOIN on the key as follows:
val joined = df1.joinWith(df2, df1.col("_c0") === df2.col("_c0"))
文件中没有标题信息,因此列将获得默认名称。连接的模式基本上是一个 Tuple2,每个 Tuple2 包含连接每一侧的列列表。
以下是架构:
scala> df1.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
scala> joined.printSchema
root
|-- _1: struct (nullable = false)
| |-- _c0: string (nullable = true)
| |-- _c1: string (nullable = true)
| |-- _c2: string (nullable = true)
| |-- _c3: string (nullable = true)
|-- _2: struct (nullable = false)
| |-- _c0: string (nullable = true)
| |-- _c1: string (nullable = true)
| |-- _c2: string (nullable = true)
| |-- _c3: string (nullable = true)
最后一步(我认为)是对您想要生产的产品的推断。我认为您想显示哪些列具有不同的值。恕我直言,您显示的输出格式有几个潜在的问题。我认为您只是想在两个输出列中显示一个不同的值。恕我直言,这有几个挑战:
- 如果记录中有两列具有不同的值 - 因此您需要为每个输出记录显示 4 个(或更多)值?
- 如果有很多不同的记录,将很难找到不同的原始列(因为在输出中丢失了任何有差异的列的标识) - 这将更加困难具有大量列的记录。
- 当您随机排列结果集中的列时,解决方案可能会更加复杂。
以下输出格式通过显示所有列以及指示哪些列具有不同值来解决上述问题。该指标是关键,因为它使差异更容易发现。
这是“蛮力”方法,其中列出了每一列并手动确定每个差异。
joined.select($"_1._C0".as("id"), $"_1._c1", $"_2._c1", when(col("_1._c1") === col("_2._c1"), "").otherwise("ne").as("c1 Ind"),
$"_1._c2", $"_2._c2", when(col("_1._c2") === col("_2._c2"), "").otherwise("ne").as("c2 Ind"),
$"_1._c3", $"_2._c3", when(col("_1._c3") === col("_2._c3"), "").otherwise("ne").as("c3 Ind")).show(false)
产生:
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
|id |_c1 |_c1 |c1 Ind|_c2 |_c2 |c2 Ind|_c3 |_c3 |c3 Ind|
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
|12343|John |John| |Rear exit market|World Centre Phase|ne |SanFrancisco|SanFrancisco| |
|54345|Fersi|Posi|ne |Dallas Road Pnth|Dallas Road Pnth | |Newyork |Newyork | |
+-----+-----+----+------+----------------+------------------+------+------------+------------+------+
蛮力方法繁琐且难以输入 - 特别是对于较大的结果集。所以我们可以使用一些 Scala 魔法来使它更优雅一点。
// Define a helper function that takes a column name and returns the three parts needed
// to generate the output for that column. i.e. select the column from the two sides of the joined result set
// and generate the case statement to generate the "ne" indicator if the two values
// are unequal.
def genComp(colName:String) = List(s"_1.$colName", s"_2.$colName", s"case when _1.$colName = _2.$colName then '' else 'ne' end as ${colName}_ind")
// Run the query to produce the results:
joined.selectExpr(
(List("_1._C0 as id") ++ genComp("_c1") ++ genComp("_c2") ++ genComp("_c3")) : _*
).show(false)
运行时,这会产生与“蛮力”方法相同的结果。
这是如何工作的?那么神奇的是第二行和 selectExpr 方法的一个特性。
selectExpr 方法具有以下签名:def selectExpr(exprs: String*): org.apache.spark.sql.DataFrame。这意味着它可以接受可变数量的字符串参数。
为了生成传递给 selectExpr 的参数,我使用了这个构造 List (strings) : _*。这是 Scala 的“魔法”,它接受一个字符串列表并将其转换为可变数量的参数参数列表。
剩下的就很简单了。基本上,genComp 函数返回一个字符串列表,这些字符串标识连接 DataFrame 每一侧的列以及不等式指示符生成逻辑。将它们连接在一起。结果被转换为传递给 selectExpr 的参数列表,最终运行与“蛮力”方法相同的查询。
这里有一个有趣的想法,我将留给您作为练习:使用 df1 的模式生成列列表以使用 genComp 输出(而不是像我展示的那样简单地手动连接它们)。
这是一个大提示:
val cols = df1.schema.filter(c => c.name != "_c0").map(c => List(c.name)).flatten
cols.foreach(println)