【问题标题】:Add new rows to pyspark Dataframe向 pyspark Dataframe 添加新行
【发布时间】:2018-10-07 05:03:12
【问题描述】:

我是一个非常新的 pyspark,但对 pandas 很熟悉。 我有一个 pyspark 数据框

# instantiate Spark
spark = SparkSession.builder.getOrCreate()

# make some test data
columns = ['id', 'dogs', 'cats']
vals = [
     (1, 2, 0),
     (2, 0, 1)
]

# create DataFrame
df = spark.createDataFrame(vals, columns)

想要添加新行 (4,5,7) 以便输出:

df.show()
+---+----+----+
| id|dogs|cats|
+---+----+----+
|  1|   2|   0|
|  2|   0|   1|
|  4|   5|   7|
+---+----+----+

【问题讨论】:

  • 您能否指出答案是否可以接受等。如果不是,请另行告知。

标签: python apache-spark pyspark


【解决方案1】:

正如thebluephantom 已经说过联合是要走的路。我只是在回答你的问题,给你一个 pyspark 的例子:

# if not already created automatically, instantiate Sparkcontext
spark = SparkSession.builder.getOrCreate()

columns = ['id', 'dogs', 'cats']
vals = [(1, 2, 0), (2, 0, 1)]

df = spark.createDataFrame(vals, columns)

newRow = spark.createDataFrame([(4,5,7)], columns)
appended = df.union(newRow)
appended.show()

请查看 databricks 常见问题解答:https://kb.databricks.com/data/append-a-row-to-rdd-or-dataframe.html

【讨论】:

    【解决方案2】:

    从我所做的事情来看,使用 union,显示一个块部分编码 - 你当然需要适应自己的情况:

    val dummySchema = StructType(
    StructField("phrase", StringType, true) :: Nil)
    var dfPostsNGrams2 = spark.createDataFrame(sc.emptyRDD[Row], dummySchema)
    for (i <- i_grams_Cols) {
        val nameCol = col({i})
        dfPostsNGrams2 = dfPostsNGrams2.union(dfPostsNGrams.select(explode({nameCol}).as("phrase")).toDF )
     }
    

    DF 与自身的联合是要走的路。

    【讨论】:

    • 这个例子有点出路,但它是关于UNION的。
    【解决方案3】:

    另一种选择是使用分区拼花格式,并为要附加的每个数据帧添加一个额外的拼花文件。通过这种方式,您可以创建(数百、数千、数百万)个 parquet 文件,当您稍后读取目录时,spark 会将它们作为一个联合文件读取。

    本例使用 pyarrow

    注意,如果您已经知道要将单个 parquet 文件放在哪里,我还展示了如何编写未分区的单个 parquet (example.parquet)。

    import pyarrow.parquet as pq
    import pandas as pd
    
    headers=['A', 'B', 'C']
    
    row1 = ['a1', 'b1', 'c1']
    row2 = ['a2', 'b2', 'c2']
    
    df1 = pd.DataFrame([row1], columns=headers)
    df2 = pd.DataFrame([row2], columns=headers)
    
    df3 = df1.append(df2, ignore_index=True)
    
    
    table = pa.Table.from_pandas(df3)
    
    pq.write_table(table, 'example.parquet', flavor='spark')
    pq.write_to_dataset(table, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')
    
    # Adding a new partition (B=b2/C=c3
    
    
    row3 = ['a3', 'b3', 'c3']
    df4 = pd.DataFrame([row3], columns=headers)
    
    table2 = pa.Table.from_pandas(df4)
    pq.write_to_dataset(table2, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')
    
    # Add another parquet file to the B=b2/C=c2 partition
    # Note this does not overwrite existing partitions, it just appends a new .parquet file.
    # If files already exist, then you will get a union result of the two (or multiple) files when you read the partition
    row5 = ['a5', 'b2', 'c2']
    df5 = pd.DataFrame([row5], columns=headers)
    table3 = pa.Table.from_pandas(df5)
    pq.write_to_dataset(table3, root_path="test_part_file", partition_cols=['B', 'C'], flavor='spark')
    

    之后读取输出

    from pyspark.sql import SparkSession
    
    spark = (SparkSession
             .builder
             .appName("testing parquet read")
             .getOrCreate())
    
    df_spark = spark.read.parquet('test_part_file')
    df_spark.show(25, False)
    

    你应该会看到这样的东西

    +---+---+---+
    |A  |B  |C  |
    +---+---+---+
    |a5 |b2 |c2 |
    |a2 |b2 |c2 |
    |a1 |b1 |c1 |
    |a3 |b3 |c3 |
    +---+---+---+
    

    如果您再次端到端地运行相同的东西,您应该会看到类似这样的重复项(因为之前的所有 parquet 文件仍然存在,所以火花联合它们)。

    +---+---+---+
    |A  |B  |C  |
    +---+---+---+
    |a2 |b2 |c2 |
    |a5 |b2 |c2 |
    |a5 |b2 |c2 |
    |a2 |b2 |c2 |
    |a1 |b1 |c1 |
    |a1 |b1 |c1 |
    |a3 |b3 |c3 |
    |a3 |b3 |c3 |
    +---+---+---+
    

    【讨论】:

      猜你喜欢
      • 2018-09-24
      • 2016-02-14
      • 2019-04-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多