免责声明:
Spark DataFrame 通常没有严格定义的顺序。使用风险自负。
为现有DataFrame添加索引:
from pyspark.sql.types import *
df_index = spark.createDataFrame(
df.rdd.zipWithIndex(),
StructType([StructField("data", df.schema), StructField("id", LongType())])
)
将索引添加到RDD 并转换为DataFrame:
words_df = spark.createDataFrame(
words_rdd.zipWithIndex(),
StructType([
StructField("words", ArrayType(StringType())),
StructField("id", LongType())
])
)
加入两者并选择必填字段:
df_index.join(words_df, "id").select("data.*", "words")
注意
有不同的解决方案,它们可能在特定情况下有效,但不能保证性能和/或正确性。其中包括:
- 使用
monotonically_increasing_id 作为join 键 - 一般情况下不正确。
- 使用
row_number() 窗口函数作为连接键 - 不可接受的性能影响,如果没有定义特定的顺序,通常是不正确的。
- 在
RDDs 上使用zip - 当且仅当两个结构具有相同的数据分布时才能工作(在这种情况下应该工作)。
注意:
在这种特定情况下,您不需要RDD。 pyspark.ml.feature提供了多种Transformers,应该很适合你。
from pyspark.ml.feature import *
from pyspark.ml import Pipeline
df = spark.createDataFrame(
["the cat and dog ran", "we went to the park", "today it will rain"],
"string"
).toDF("Articles")
Pipeline(stages=[
RegexTokenizer(inputCol="Articles", outputCol="Tokens"),
StopWordsRemover(inputCol="Tokens", outputCol="Words")
]).fit(df).transform(df).show()
# +-------------------+--------------------+---------------+
# | Articles| Tokens| Words|
# +-------------------+--------------------+---------------+
# |the cat and dog ran|[the, cat, and, d...|[cat, dog, ran]|
# |we went to the park|[we, went, to, th...| [went, park]|
# | today it will rain|[today, it, will,...| [today, rain]|
# +-------------------+--------------------+---------------+
可以使用StopWordsRemover的stopWords参数提供停用词列表,例如:
StopWordsRemover(
inputCol="Tokens",
outputCol="Words",
stopWords=["the", "and", "we", "to", "it"]
)