【问题标题】:Pyspark DataFrame loopPyspark DataFrame 循环
【发布时间】:2020-04-16 12:55:32
【问题描述】:

我是 Python 和 DataFrame 的新手。在这里,我正在编写 Python 代码以在 AWS Glue 中运行 ETL 作业。请在下面找到相同的代码 sn-p。

test_DyF = glueContext.create_dynamic_frame.from_catalog(database="teststoragedb", table_name="testtestfile_csv")
test_dataframe = test_DyF.select_fields(['empid','name']).toDF()

现在上面的 test_dataframe 的类型是pyspark.sql.dataframe.DataFrame

现在,我需要遍历上面的 test_dataframe。据我所知,我只能看到collecttoLocalIterator。请找到下面的示例代码

for row_val in test_dataframe.collect():

但这两种方法都非常缓慢且效率不高。我不能使用 pandas,因为 AWS Glue 不支持它。

请找出我正在做的步骤

来源信息:

productid|matchval|similar product|similar product matchval
product A|100|product X|100
product A|101|product Y|101
product B|100|product X|100
product C|102|product Z|102

预期结果:

product |similar products
product A|product X, product Y
product B|product X
product C|product Z

这是我正在编写的代码

  1. 我正在使用 productID 获得源的不同数据框
  2. 循环遍历这个不同的数据框集

    a) 从源中获取产品的 matchval 列表

    b) 根据 matchval 过滤器识别相似产品

    c) 循环获取连接字符串 ---> 使用 rdd.collect 的循环会影响性能

您能否就可以做的事情分享任何更好的建议?

【问题讨论】:

  • 你试图通过循环实现的逻辑是什么?
  • 嗨@MohammadMurtazaHashmi,基本上我有嵌套循环,我在内部for循环中遇到这个问题,因为collect()将被多次调用。我需要在内部循环中以逗号分隔格式获取名称。

标签: python amazon-web-services dataframe pyspark aws-glue


【解决方案1】:

请详细说明您想尝试的逻辑。 DF 循环可以通过 SQL 方法完成,也可以按照以下 RDD 方法进行

def my_function(each_record):
#my_logic

#loop through for each command. 
df.rdd.foreach(my_function)

根据您的输入进一步添加以下代码

df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|")
seq = ['product X','product Y','product Z']
df2 = df.groupBy("productid").pivot("similar_product",seq).count()

+---------+---------+---------+---------+
|productid|product X|product Y|product Z|
+---------+---------+---------+---------+
|product B|        1|     null|     null|
|product A|        1|        1|     null|
|product C|     null|     null|        1|
+---------+---------+---------+---------+

符合您要求的最终方法

df = spark.read.csv("/mylocation/61250775.csv", header=True, inferSchema=True, sep="|") df.printSchema()

>>> df.printSchema()
root
 |-- id: string (nullable = true)
 |-- matchval1: integer (nullable = true)
 |-- similar: string (nullable = true)
 |-- matchval3: integer (nullable = true)


from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import collect_list
dfx = df.groupBy("id").agg(concat_ws(",", collect_list("similar")).alias("Similar_Items")).select(col("id"), col("Similar_Items"))
dfx.show()

+---------+-------------------+
|       id|      Similar_Items|
+---------+-------------------+
|product B|          product X|
|product A|product X,product Y|
|product C|          product Z|
+---------+-------------------+

【讨论】:

  • 您好,感谢您的回复。我已在问题中添加了详细信息。你能查一下吗?
  • 感谢 H Roy 提供的详细步骤。根据我所看到的,这可以在没有循环的情况下实现。
【解决方案2】:

您也可以使用 MAP 类。就我而言,我正在遍历数据并计算整行的哈希值。

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import hashlib

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "load-test", table_name = "table_test", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "load-test", table_name = "table_test", transformation_ctx = "datasource0")

def hash_calculation(rec):
    md5 = hashlib.md5()
    md5.update('{}_{}_{}_{}'.format(rec["funcname"],rec["parameter"],rec["paramtype"],rec["structure"]).encode())
    rec["hash"]  = md5.hexdigest()
    print("looping the recs")
    return rec
    
mapped_dyF =  Map.apply(frame = datasource0, f = hash_calculation)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2016-07-20
    • 1970-01-01
    • 2020-07-09
    • 2022-12-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-28
    相关资源
    最近更新 更多