【问题标题】:Pyspark replace multiple strings in RDDPyspark替换RDD中的多个字符串
【发布时间】:2018-12-22 19:28:58
【问题描述】:

我想替换 pyspark rdd 中的多个字符串。我想替换这些字符串按长度顺序 - 从最长到最短。该操作最终将替换大量文本,因此需要考虑良好的性能。

问题示例:

在下面的例子中,我想替换字符串:

 replace, text, is

按照各自的顺序(从最长到最短):

 replacement1, replacement2, replacement3

即如果找到字符串 replace,则应将其替换为 replacement1,在此示例中,首先要搜索和替换。

字符串也将存储为 pyspark rdd,如下所示:

+---------+------------------+
| string  | replacement_term |
+---------+------------------+
| replace | replacement1     |
+---------+------------------+
| text    | replacement2     |
+---------+------------------+
| is      | replacement3     |
+---------+------------------+

查看需要替换为上述术语的rdd示例:

+----+-----------------------------------------+
| id | text                                    |
+----+-----------------------------------------+
| 1  | here is some text to replace with terms |
+----+-----------------------------------------+
| 2  | text to replace with terms              |
+----+-----------------------------------------+
| 3  | text                                    |
+----+-----------------------------------------+
| 4  | here is some text to replace            |
+----+-----------------------------------------+
| 5  | text to replace                         |
+----+-----------------------------------------+

我想替换,创建 rdd 输出如下:

+----+----------------------------------------------------------------+
| id | text                                                           |
+----+----------------------------------------------------------------+
| 1  | here replacement3 some replacement2 to replacement1 with terms |
+----+----------------------------------------------------------------+
| 2  | replacement2 to replacement1 with terms                        |
+----+----------------------------------------------------------------+
| 3  | replacement2                                                   |
+----+----------------------------------------------------------------+
| 4  | here replacement3 some replacement2 to replacement1            |
+----+----------------------------------------------------------------+
| 5  | replacement2 to replacement1                                   |
+----+----------------------------------------------------------------+

感谢您的帮助。

【问题讨论】:

  • 那么,如果你有一个预定义了替换术语的 rdd,那么“按长度顺序替换这些字符串”是什么意思?不能一次性全部换掉吗?
  • 我的问题中可能有两个字符串发生冲突。例如,“is”和“is not”都包含 is。我更喜欢在我的用例中使用更长的字符串。希望这是有道理的。

标签: python-3.x apache-spark pyspark


【解决方案1】:

以下代码 sn-p 适用于 Spark / ScalaDataFrames API。 尝试使其适应RDD & PySpark

// imports
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

// spark-session (not needed if your'e in spark-shell)
implicit val spark: SparkSession = SparkSession.builder().appName("SO").getOrCreate()

// you'll be reading it from somewhere
val dfToBeModified: DataFrame = spark.createDataFrame(
  rowRDD = spark.sparkContext.parallelize(List(
    Row(1, "here is some text to replace with terms"),
    Row(2, "text to replace with terms"),
    Row(3, "text"),
    Row(4, "here is some text to replace"),
    Row(5, "text to replace")
  )),
  schema = StructType(List(
    StructField("id", IntegerType, false),
    StructField("text", StringType, false)
  ))
)

// it should preferably be read not as a dataframe but as a sequence  
val dfWithReplacements: DataFrame = spark.createDataFrame(
    rowRDD = spark.sparkContext.parallelize(List(
    Row("replace", "replacement1"),
    Row("text", "replacement2"),
    Row("is", "replacement3")
  )),
  schema = StructType(List(
    StructField("string", StringType, false),
    StructField("replacement_term", StringType, false)
  ))
)

// dfWithReplacements must not be too big or your executor will crash
val seqWithReplacements: Array[Row] = dfWithReplacements.collect()

// there you go
val dfWithModifications: DataFrame = seqWithReplacements.foldLeft(dfToBeModified) { (dfWithSomeModifications: DataFrame, row: Row) =>
    dfWithSomeModifications.withColumn("text", regexp_replace(dfWithSomeModifications("text"), row(0).toString, row(1).toString))
}

【讨论】:

  • 谢谢!我会试一试并发布 Pyspark 版本。感谢您的帮助
  • 嗨@Shubham,如何使用地图替换来代替 dfwtihReplacement 数据框,例如 val replacements= Map( "replace" -> "replacement1", "text" -> "replacement2", "is " -> "replacement3")
【解决方案2】:

所以,假设你不能收集替换词 rdd, 但也假设替换术语是一个单词:

首先你需要把文本弄平(并记住词序)。

然后你做一个左连接来替换单词。

然后你重新组合原始文本。

replacement_terms_rdd = sc.parallelize([("replace", "replacement1"),
                                        ("text", "replacement2"),
                                        ("is", "replacement3")])

text_rdd = sc.parallelize([(1, "here is some text to replace with terms"),
                          (2, "text to replace with terms "),
                          (3, "text"),
                          (4, "here is some text to replace"),
                          (5, "text to replace")])

print (text_rdd\
.flatMap(lambda x: [(y[1], (x[0], y[0])) for y in enumerate(x[1].split())] )\
.leftOuterJoin(replacement_terms_rdd)\
.map(lambda x: (x[1][0][0], (x[1][0][1], x[1][1] or x[0]) ))\
.groupByKey().mapValues(lambda x: " ".join([y[1] for y in sorted(x)]))\
.collect())

结果:

[(1, 'here replacement3 some replacement2 to replacement1 with terms'), (2, 'replacement2 to replacement1 with terms'), (3, 'replacement2'), (4, 'here replacement3 some replacement2 to replacement1'), (5, 'replacement2 to replacement1')]

【讨论】:

  • 如果您的替换术语不需要一个单词,但您知道任何替换术语的最大长度,则可以通过从文本中获取 ngram 来轻松解决此问题。当然会影响性能。
  • 谢谢!这些术语确实可以有多个单词,请您显示 ngrams 解决方案吗?谢谢!
猜你喜欢
  • 2016-03-16
  • 2019-04-04
  • 1970-01-01
  • 2019-12-23
  • 1970-01-01
  • 1970-01-01
  • 2016-09-10
  • 2019-12-28
相关资源
最近更新 更多