【问题标题】:pySpark DataFrames .groupBy() insert a string of valuespySpark DataFrames .groupBy() 插入一串值
【发布时间】:2015-07-18 13:00:44
【问题描述】:

我的代码如下所示:

print df.groupBy('offer_id', 'record_id').avg().collect()

这可行,但我有一个字符串:

print df.groupBy(stringNamesDF).avg().collect()

失败的原因是:

org.apache.spark.sql.AnalysisException: cannot resolve ''record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr'' given input columns pymt, rescindable_days, rescinded_date, market_cell_id, offer_sales_script, assigned_offer_id, offer_desc, rate_index_type_cd, nbr_of_pymts, campaign_id, down_pymt, offer_status_type_cd, offer_type_cd, acct_expiration_dttm, record_id, origination_fee_rate, insrt_usr_id, promo_id, term_mm, min_amount, offer_good_until_date, decision_id, insrt_dttm, late_fee_min_amount, late_fee_percent, offer_id, origination_fee_amount, presentation_instrument_nbr, offer_order, chng_usr_id, correlation_id, acct_nbr_assigned_dttm, chng_dttm, presentable_flag, accepted_offer_flag, amount, min_rate, max_rate, acct_nbr, actv_flag, sub_product_id, cs_result_id, current_offer_flag, finance_charge, annual_fee_waived_mm, cs_result_usage_type_cd, max_amount, total_pymts, contract_date, index_rate, first_pymt_date, annual_fee_amount, rate, amount_financed, pymt_method_type_cd;

stringNamesDF 打印如下:

'record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr'

我也试过 stringNamesDF 看起来像这样:

record_id, assigned_offer_id, accepted_offer_flag, current_offer_flag, offer_good_until_date, rescinded_date, first_pymt_date, contract_date, acct_nbr, acct_nbr_assigned_dttm, acct_expiration_dttm, offer_desc, offer_sales_script, presentable_flag, insrt_dttm, insrt_usr_id, chng_dttm, chng_usr_id, actv_flag, correlation_id, offer_status_type_cd, presentation_instrument_nbr

但是得到这个:

org.apache.spark.sql.AnalysisException: cannot resolve 'record_id, assigned_offer_id, accepted_offer_flag, current_offer_flag, offer_good_until_date, rescinded_date, first_pymt_date, contract_date, acct_nbr, acct_nbr_assigned_dttm, acct_expiration_dttm, offer_desc, offer_sales_script, presentable_flag, insrt_dttm, insrt_usr_id, chng_dttm, chng_usr_id, actv_flag, correlation_id, offer_status_type_cd, presentation_instrument_nbr' given input columns pymt, rescindable_days, rescinded_date, market_cell_id, offer_sales_script, assigned_offer_id, offer_desc, rate_index_type_cd, nbr_of_pymts, campaign_id, down_pymt, offer_status_type_cd, offer_type_cd, acct_expiration_dttm, record_id, origination_fee_rate, insrt_usr_id, promo_id, term_mm, min_amount, offer_good_until_date, decision_id, insrt_dttm, late_fee_min_amount, late_fee_percent, offer_id, origination_fee_amount, presentation_instrument_nbr, offer_order, chng_usr_id, correlation_id, acct_nbr_assigned_dttm, chng_dttm, presentable_flag, accepted_offer_flag, amount, min_rate, max_rate, acct_nbr, actv_flag, sub_product_id, cs_result_id, current_offer_flag, finance_charge, annual_fee_waived_mm, cs_result_usage_type_cd, max_amount, total_pymts, contract_date, index_rate, first_pymt_date, annual_fee_amount, rate, amount_financed, pymt_method_type_cd;

编辑:我尝试过:stringNames[] 看起来没有成功:

['record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr']

并得到 AttributeError: 'list' object has no attribute '_get_object_id'

【问题讨论】:

  • 它应该是一个数组 stringNamesDF=['record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr ','acct_nbr_assigned_dttm','acct_expiration_dttm','offer_desc','offer_sales_script','presentable_flag','insrt_dttm','insrt_usr_id','chng_dttm','chng_usr_id','actv_flag','correlation_type_id','offer_status', 'presentation_instrument_nbr']
  • 仍然无法工作,请查看我的编辑。

标签: apache-spark dataframe pyspark


【解决方案1】:

试试:

print df.groupBy(stringNamesDF.split(", ")).avg().collect()

【讨论】:

    【解决方案2】:

    当 stringNamesDF 是一个列表时:

    stringNamesDF=['record_id', 'assigned_offer_id', 'accepted_offer_flag', 'current_offer_flag', 'offer_good_until_date', 'rescinded_date', 'first_pymt_date', 'contract_date', 'acct_nbr', 'acct_nbr_assigned_dttm', 'acct_expiration_dttm', 'offer_desc', 'offer_sales_script', 'presentable_flag', 'insrt_dttm', 'insrt_usr_id', 'chng_dttm', 'chng_usr_id', 'actv_flag', 'correlation_id', 'offer_status_type_cd', 'presentation_instrument_nbr']  
    

    用途:

    df.groupBy(*stringNamesDF).avg().collect()
    

    来自https://spark.apache.org/docs/latest/api/python/pyspark.sql.html

    groupBy(*cols)

    使用指定的列对 DataFrame 进行分组,因此我们可以对它们进行聚合。

    参数: cols – 要分组的列列表。每个元素应该是一个列名(字符串)或一个表达式(Column)。

    例子:

    l = [('爱丽丝','2015-02-02', 1),('爱丽丝','2015-02-02', 2), ('爱丽丝','2015-02-03' , 1), ('Bob','2015-02-03', 1), ('Bob','2015-02-03', 3)]

    ddf = sqlContext.createDataFrame(l, ['name', 'date','clicks'])

    ddf.groupBy(*['name','date']).avg().collect()

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-10-22
      • 2016-08-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-08-11
      • 1970-01-01
      相关资源
      最近更新 更多