【问题标题】:Add PySpark RDD as new column to pyspark.sql.dataframe将 PySpark RDD 作为新列添加到 pyspark.sql.dataframe
【发布时间】:2017-06-26 17:07:39
【问题描述】:

我有一个 pyspark.sql.dataframe,其中每一行都是一篇新闻文章。然后我有一个 RDD,它代表每篇文章中包含的单词。我想将单词的 RDD 作为名为“单词”的列添加到我的新文章数据框中。我试过了

df.withColumn('words', words_rdd )

但我得到了错误

AssertionError: col should be Column

DataFrame 看起来像这样

Articles
the cat and dog ran
we went to the park
today it will rain

但我有 3k 篇新闻文章。

我应用了一个函数来清理文本,例如删除停用词,我有一个如下所示的 RDD:

[[cat, dog, ran],[we, went, park],[today, will, rain]]

我正在尝试让我的 Dataframe 看起来像这样:

Articles                 Words
the cat and dog ran      [cat, dog, ran]
we went to the park      [we, went, park]
today it will rain       [today, will, rain]

【问题讨论】:

  • 请分享示例数据,但您可能需要加入。
  • 它们是如何匹配的?为什么 [the, cat, and, dog, ran] 的词与 cat and dog ran 的文章而不是另一篇文章匹配?

标签: python apache-spark pyspark


【解决方案1】:

免责声明

Spark DataFrame 通常没有严格定义的顺序。使用风险自负。

为现有DataFrame添加索引:

from pyspark.sql.types import *

df_index = spark.createDataFrame(
    df.rdd.zipWithIndex(),
    StructType([StructField("data", df.schema), StructField("id", LongType())])
)

将索引添加到RDD 并转换为DataFrame

words_df = spark.createDataFrame(
    words_rdd.zipWithIndex(),
    StructType([
        StructField("words", ArrayType(StringType())),
        StructField("id", LongType())
    ])
)

加入两者并选择必填字段:

df_index.join(words_df, "id").select("data.*", "words")

注意

有不同的解决方案,它们可能在特定情况下有效,但不能保证性能和/或正确性。其中包括:

  • 使用monotonically_increasing_id 作为join 键 - 一般情况下不正确。
  • 使用 row_number() 窗口函数作为连接键 - 不可接受的性能影响,如果没有定义特定的顺序,通常是不正确的。
  • RDDs 上使用zip - 当且仅当两个结构具有相同的数据分布时才能工作(在这种情况下应该工作)。

注意

在这种特定情况下,您不需要RDDpyspark.ml.feature提供了多种Transformers,应该很适合你。

from pyspark.ml.feature import *
from pyspark.ml import Pipeline

df = spark.createDataFrame(
     ["the cat and dog ran", "we went to the park", "today it will rain"],
         "string"
).toDF("Articles")

Pipeline(stages=[
    RegexTokenizer(inputCol="Articles", outputCol="Tokens"), 
    StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# |           Articles|              Tokens|          Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...|   [went, park]|
# | today it will rain|[today, it, will,...|  [today, rain]|
# +-------------------+--------------------+---------------+

可以使用StopWordsRemoverstopWords参数提供停用词列表,例如:

StopWordsRemover(
    inputCol="Tokens",
    outputCol="Words",
    stopWords=["the", "and", "we", "to", "it"]
)

【讨论】:

  • 在我的例子中,所有行都是随机生成的,所以顺序并不重要。这应该很好用,谢谢。
  • @bendl 一般来说,您仍然更喜欢不需要组合结构的方法。如果您生成随机数据,那么udfmap 应该可以正常工作。
【解决方案2】:

为什么要将rdd加入回数据框,我宁愿直接从“文章”创建一个新列。有多种方法可以做到,这是我的 5 美分:

from pyspark.sql import Row
from pyspark.sql.context import SQLContext
sqlCtx = SQLContext(sc)    # sc is the sparkcontext

x = [Row(Articles='the cat and dog ran'),Row(Articles='we went to the park'),Row(Articles='today it will rain')]
df = sqlCtx.createDataFrame(x)

df2 = df.map(lambda x:tuple([x.Articles,x.Articles.split(' ')])).toDF(['Articles','words'])
df2.show()

你会得到以下输出:

Articles                 words
the cat and dog ran      [the, cat, and, dog, ran]
we went to the park      [we, went, to, the, park]
today it will rain       [today, it, will, rain]

如果您想实现其他目标,请告诉我。

【讨论】:

  • 这几乎是我想要的,但我有 3k 篇文章,我想对这些文章中的每一篇应用一个函数来执行一些清理(不仅仅是拆分)并将它放在一个数据框中,就像你上面一样.这是我第一次使用 pyspark,所以我不确定最好的方法。
  • 您能否提供一个实际数据的示例文件。任何函数都可以在 udf in spark 的帮助下应用
  • 使用 udf 让它工作:newdf = df.withColumn("words",udf_clean_text("articles")) 谢谢!!
  • 如果这解决了您的问题,请务必将此答案标记为正确。
【解决方案3】:

一个简单但有效的方法是使用udf。你可以:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df = spark.createDataFrame(["the cat and dog ran", "we went to the park", "today it will rain", None], 
"string" ).toDF("Articles")

split_words = udf(lambda x : x.split(' ') if x is not None else x, StringType())
df = df.withColumn('Words', split_words(df['Articles']))

df.show(10,False)
>>
+-------------------+-------------------------+
|Articles           |Words                    |
+-------------------+-------------------------+
|the cat and dog ran|[the, cat, and, dog, ran]|
|we went to the park|[we, went, to, the, park]|
|today it will rain |[today, it, will, rain]  |
|null               |null                     |
+-------------------+-------------------------+

我添加了对 None 的检查,因为在您的数据中通常会有坏行。您可以在拆分后或之前轻松删除它们,使用 dropna。

但在我看来,如果您想将此作为文本分析的准备任务,那么按照@user9613318 在他的回答中建议的那样构建管道可能符合您的最大利益

【讨论】:

    【解决方案4】:
    rdd1 = spark.sparkContext.parallelize([1, 2, 3, 5])
    # make some transformation on rdd1:
    rdd2 = rdd.map(lambda n: True if n % 2 else False)
    # Append each row in rdd2 to those in rdd1.
    rdd1.zip(rdd2).collect()
    

    【讨论】:

    • 请解释您的代码如何以及为什么解决问题或改进问题中的代码。
    猜你喜欢
    • 1970-01-01
    • 2017-11-07
    • 1970-01-01
    • 1970-01-01
    • 2020-09-12
    • 1970-01-01
    • 2020-12-06
    • 2021-05-14
    • 2019-12-27
    相关资源
    最近更新 更多