【问题标题】:How to handle Incremental Update in HDFS hadoop Map-Reduce如何处理 HDFS hadoop Map-Reduce 中的增量更新
【发布时间】:2017-10-25 15:48:14
【问题描述】:

我在 HDF 中有结构化的基本文本文件,其中包含这样的数据(在 file.txt 中):

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|

4295877341|^|136|^|4|^|1|^|I|!|
4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|138|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877341|^|145|^|14|^|1|^|I|!|
123456789|^|145|^|14|^|1|^|I|!|

file.txt 的大小为 30 GB。

我有大约 2 GB 的增量数据 file1.txt 以相同的格式出现在 HFDS 中,如下所示:

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|

4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|
123456789|^|145|^|14|^|1|^|D|!|

现在我必须合并 file.txt 和 file1.txt 并创建一个包含所有唯一记录的最终文本文件。

两个文件中的键都是 OrgId。如果在第一个文件中找到相同的 OrgId,那么我必须用新的 OrgId 替换,如果没有,那么我必须插入新的 OrgId。

最终输出是这样的。

OgId|^|ItemId|^|segmentId|^|Sequence|^|Action|!|

4295877346|^|136|^|4|^|1|^|I|!|
4295877341|^|213|^|4|^|1|^|I|!|
4295877341|^|215|^|2|^|1|^|I|!|
4295877341|^|141|^|4|^|1|^|I|!|
4295877341|^|143|^|2|^|1|^|I|!|
4295877343|^|149|^|14|^|2|^|I|!|

如何在 mapreduce 中做到这一点?

我不会选择 HIVE 解决方案,因为我有很多这样的不同文件,大约 10.000 个,所以我必须在 HIVE 中创建 10.000 个分区。

对这个用例使用 Spark 有什么建议吗?

【问题讨论】:

  • 为什么要在 mapreduce 中?我可以建议你在 scala 中回答 Spark 和 Hadoop 吗?
  • 是的,请......一些代码会很棒
  • 我猜你对 Spark、Scala 和 dataFrame 有一定的了解,对吗?
  • 我没看懂你的台词.If same OrgId is found in the first file then i have to replace with the new OrgId and if not then then i have to insert new OrgId.你能澄清一下吗?
  • 是的,我已经开始学习了。说到我的问题,基本上我必须用新记录替换记录,如果 OrgId 匹配,如果不匹配,则将其视为新记录,因此将被附加。

标签: hadoop apache-spark mapreduce hdfs


【解决方案1】:

我建议你在scala 中为spark 编程。如果您在mapreduce 中编程,它仅对hadoop 有用,但在scala 中为spark 编程将使您能够在sparkhadoop 中进行处理。发起Spark 是为了处理mapreduce 模型中的缺陷。您可以找到有关此主题的许多资源。其中之一是this

关于你的问题,我建议你使用dataframe

第一个任务是为数据框创建schema

val schema = StructType(Array(StructField("OgId", StringType),
  StructField("ItemId", StringType),
  StructField("segmentId", StringType),
  StructField("Sequence", StringType),
  StructField("Action", StringType)))

下一个任务是读取这两个文件并使用上述架构创建数据框

import org.apache.spark.sql.functions._
val textRdd1 = sparkContext.textFile("input path to file1 in hdfs")
val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df1 = sqlContext.createDataFrame(rowRdd1, schema)
df1 = df1.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))

val textRdd2 = sparkContext.textFile("input path to file 2 in hdfs")
val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|\\^\\|", -1)))
var df2 = sqlContext.createDataFrame(rowRdd2, schema)
df2 = df2.withColumn("Action", regexp_replace($"Action", "[|!|]", ""))

df1 的输出是

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|136   |4        |1       |I     |
|4295877346|136   |4        |1       |I     |
|4295877341|138   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877341|145   |14       |1       |I     |
+----------+------+---------+--------+------+

df2 的输出是

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877341|213   |4        |1       |I     |
|4295877341|215   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877343|149   |14       |2       |I     |
+----------+------+---------+--------+------+

现在根据您的要求,如果 OgIddf2 匹配,您想从 df1 中删除 rows,并将所有 df2 附加到 df1。这些要求可以按如下方式完成

val tempdf = df2.select("OgId").withColumnRenamed("OgId", "OgId_1")

df1 = df1.join(tempdf, df1("OgId") === tempdf("OgId_1"), "left")
df1 = df1.filter("OgId_1 is null").drop("OgId_1")
df1 = df1.union(df2)

最终输出是

+----------+------+---------+--------+------+
|OgId      |ItemId|segmentId|Sequence|Action|
+----------+------+---------+--------+------+
|4295877346|136   |4        |1       |I     |
|4295877341|213   |4        |1       |I     |
|4295877341|215   |2        |1       |I     |
|4295877341|141   |4        |1       |I     |
|4295877341|143   |2        |1       |I     |
|4295877343|149   |14       |2       |I     |
+----------+------+---------+--------+------+

这个最终结果可以保存在hdfs

df1.write.format("com.databricks.spark.csv").save("output file path in hdfs")

希望对你有帮助

注意:确保输入和输出位置的路径写的正确

【讨论】:

  • 如果您遇到任何问题,请告诉我。 :) 如果您认为答案值得一票,请投票。 ;) 谢谢
  • 嗨 Ramesh 只是一个问题,如果我将有与基本文件的标题不同的增量文件,那么这个解决方案也可以工作吗?而且我的增量文件也是有序的,所以如果我们加入然后更新顺序会保留吗?
  • 标头名称是用架构定义的。因此,标题名称是否不同并不重要。但是数据应该是相同的列格式。我不明白你的第二个问题。
  • 嗨,Ramesh 也是这个主要的顺序吗?我的意思是先出现的记录会先更新?
  • 和他们一起玩 :) 我想你可以做到。我建议你增加执行器核心:)
猜你喜欢
  • 2012-05-12
  • 1970-01-01
  • 2016-10-25
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-07-07
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多