【问题标题】:pyspark error: AttributeError: 'SparkSession' object has no attribute 'parallelize'pyspark 错误:AttributeError:'SparkSession' 对象没有属性'parallelize'
【发布时间】:2017-01-24 02:00:47
【问题描述】:

我在 Jupyter 笔记本上使用 pyspark。以下是 Spark 的设置方式:

import findspark
findspark.init(spark_home='/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive', python_path='python2.7')

    import pyspark
    from pyspark.sql import *

    sc = pyspark.sql.SparkSession.builder.master("yarn-client").config("spark.executor.memory", "2g").config('spark.driver.memory', '1g').config('spark.driver.cores', '4').enableHiveSupport().getOrCreate()

    sqlContext = SQLContext(sc)

然后当我这样做时:

spark_df = sqlContext.createDataFrame(df_in)

其中df_in 是熊猫数据框。然后我收到以下错误:

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-9-1db231ce21c9> in <module>()
----> 1 spark_df = sqlContext.createDataFrame(df_in)


/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive/python/pyspark/sql/context.pyc in createDataFrame(self, data, schema, samplingRatio)
    297         Py4JJavaError: ...
    298         """
--> 299         return self.sparkSession.createDataFrame(data, schema, samplingRatio)
    300 
    301     @since(1.3)

/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive/python/pyspark/sql/session.pyc in createDataFrame(self, data, schema, samplingRatio)
    520             rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    521         else:
--> 522             rdd, schema = self._createFromLocal(map(prepare, data), schema)
    523         jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
    524         jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), schema.json())

/home/edamame/spark/spark-2.0.0-bin-spark-2.0.0-bin-hadoop2.6-hive/python/pyspark/sql/session.pyc in _createFromLocal(self, data, schema)
    400         # convert python objects to sql data
    401         data = [schema.toInternal(row) for row in data]
--> 402         return self._sc.parallelize(data), schema
    403 
    404     @since(2.0)

AttributeError: 'SparkSession' object has no attribute 'parallelize'

有谁知道我做错了什么?谢谢!

【问题讨论】:

    标签: python hadoop pandas apache-spark pyspark


    【解决方案1】:

    SparkSession 不是SparkContext 的替代品,而是SQLContext 的等价物。只需使用与使用 SQLContext 相同的方式即可:

    spark.createDataFrame(...)
    

    如果您必须访问SparkContext,请使用sparkContext 属性:

    spark.sparkContext
    

    因此,如果您需要 SQLContext 以实现向后兼容性,您可以:

    SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
    

    【讨论】:

    • 然后我尝试了: spark_df = sc.createDataFrame(df_in) ,但 spark_df 似乎已损坏。 spark_df = sc.createDataFrame(df_in) 是在这里进行转换的正确方法吗?
    • 仅当df_increateDataFrame 的有效参数时。
    • df_in 是一个熊猫数据框。我认为它应该是有效的?
    • 非常感谢@zero323
    【解决方案2】:

    每当我们尝试从向后兼容的对象(如 RDD)或由 spark 会话创建的数据框创建 DF 时,您需要让您的 SQL 上下文感知您的会话和上下文。

    例如:

    如果我创建一个 RDD:

    ss=SparkSession.builder.appName("vivek").master('local').config("k1","vi").getOrCreate()
    
    rdd=ss.sparkContext.parallelize([('Alex',21),('Bob',44)])
    

    但是如果我们希望从这个 RDD 创建一个 df,我们需要

    sq=SQLContext(sparkContext=ss.sparkContext, sparkSession=ss)

    那么只有我们可以将 SQLContext 与 pandas 创建的 RDD/DF 一起使用。

    schema = StructType([
       StructField("name", StringType(), True),
       StructField("age", IntegerType(), True)])
    df=sq.createDataFrame(rdd,schema)
    df.collect()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2019-05-17
      • 2017-03-10
      • 2023-02-10
      • 2016-11-30
      • 2018-03-21
      • 2021-11-07
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多