【问题标题】:Pyspark dataframe or parquet file to DynamoDBPyspark 数据框或镶木地板文件到 DynamoDB
【发布时间】:2021-05-13 12:10:33
【问题描述】:

我想将 pyspark 数据框或 parquet 文件放入 DynamoDB 表中

我拥有的 pyspark 数据框有 30MM 行和 20 列

解决方案 1:使用 boto3、pandas 和批量写入 (Amazon DynamoDB)

有了这个,我读取了 parquet 文件并将其传递给 pandas,然后我将逐行放入 DynamoDB 表中,但这花费的时间太长,非常慢

import boto3

dynamodb = boto3.resource('dynamodb', region_name='name')

table = dynamodb.Table('DynamoDB_table_name')
with table.batch_writer() as batch:
    for index, row in pandas_dataframe.iterrows():
      batch.put_item(
          Item = {
              'column_name_DynamoDB_table': int(row['column_name_in_pandas_dataframe']),
              ...
          }
      )

解决方案 2:使用 boto3、pyspark 和 SQL (how-to-write-pyspark-dataframe-to-dynamodb-table)

在这里,我在解决方案中描述的第 3 步中不断收到错误,ParseException 错误,我查看了亚马逊文档,发现代码是正确的 (EMR_Hive_Commands.html),也许是不是 SQL 代码,这是我的错误,但如果不是,我不知道是哪种语言

-- Step 1
DROP TABLE IF EXISTS TEMP;
CREATE TABLE TEMP(
        column_name_DynamoDB_table type,
        ... )
 STORED AS ORC; 

--step 2.1
pyspark_dataframe.createOrReplaceTempView("df")


--step 2.2
INSERT INTO temp
    SELECT *
    FROM df


--step 3
CREATE TABLE TEMPTODYNAMO(
        column_name_DynamoDB_table type,
        ... )
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ( "dynamodb.table.name" ="temp-to-dynamo" , 
                "dynamodb.column.mapping" = "col1:column_name_DynamoDB_table,...");

我不断收到的错误:

Error in SQL statement: ParseException: 
Operation not allowed: STORED BY(line 22, pos 0)

== SQL ==
CREATE TABLE TEMPTODYNAMO(
        column_name_DynamoDB_table type,
        ...  )
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
^^^
TBLPROPERTIES ( "dynamodb.table.name" ="temp-to-dynamo" , 
                "dynamodb.column.mapping" = "col1:column_name_DynamoDB_table,...")

解决方案 3:使用 boto3、pyspark 和 com.audienceproject (Spark+DynamoDB)

我没看懂代码里放什么,页面中显示的python代码是:

# Python
# Load a DataFrame from a Dynamo table. Only incurs the cost of a single scan for schema inference.
dynamoDf = spark.read.option("tableName", "SomeTableName") \
                     .format("dynamodb") \
                     .load() # <-- DataFrame of Row objects with inferred schema.

# Scan the table for the first 100 items (the order is arbitrary) and print them.
dynamoDf.show(100)

# write to some other table overwriting existing item with same keys
dynamoDf.write.option("tableName", "SomeOtherTable") \
              .format("dynamodb") \
              .save()

但我真的不知道将我的 DynamoDB 表的名称和我的 pyspark 数据框放在哪里

更新:我试过了

pysaprk_dataframe.write.option("tableName", "name_DynamoDB_table") \
                .format("dynamodb") \
                .save()

得到了这个错误:

AnalysisException: TableProvider implementation dynamodb cannot be written with ErrorIfExists mode, please use Append or Overwrite modes instead

问候

【问题讨论】:

  • 我尝试了“解决方案 2”,效果很好。尝试从 Spark SQL 运行查询“步骤 3”时,我也收到了 ParseException 错误。相反,我必须从 Hive CLI 运行查询“步骤 3”,这解决了问题。见here

标签: python pandas pyspark amazon-dynamodb


【解决方案1】:

已尝试解决方案 #3,使用以下代码 sn-p 并使其正常工作

代码更改是添加模式('append')

dynamoDf.write.mode('append').option("tableName","db_dev_users_v2") \
              .option("region",region) \
              .format("dynamodb") \
              .save()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2022-06-16
    • 1970-01-01
    • 2023-01-03
    • 2023-03-24
    • 2020-10-23
    • 2018-08-13
    • 2020-04-03
    • 1970-01-01
    相关资源
    最近更新 更多