【问题标题】:Converting Pandas DataFrame to Spark DataFrame将 Pandas DataFrame 转换为 Spark DataFrame
【发布时间】:2017-03-17 15:35:50
【问题描述】:

我之前问过一个关于如何Convert scipy sparse matrix to pyspark.sql.dataframe.DataFrame 的问题,并在阅读了提供的答案以及this article 后取得了一些进展。我最终找到了以下代码,用于将 scipy.sparse.csc_matrix 转换为 pandas 数据框:

df = pd.DataFrame(csc_mat.todense()).to_sparse(fill_value=0)
df.columns = header

然后我尝试使用建议的语法将 pandas 数据帧转换为 spark 数据帧:

spark_df = sqlContext.createDataFrame(df)

但是,我得到以下错误:

ValueError: cannot create an RDD from type: <type 'list'>

我不相信它与 sqlContext 有任何关系,因为我能够将另一个大致相同大小的 pandas 数据帧转换为 spark 数据帧,没问题。有什么想法吗?

【问题讨论】:

  • 你运行的是什么版本?我觉得还可以
  • 在转换为 Spark DF 之前尝试 print df。你可能会得到一些关于list 类型的线索。
  • 打印部分数据帧(100K 行,5300 列)后,我注意到的唯一特征是每列的 dtype 为“float64”,因此每个数字都表示为带有尾随零的数量。然而,只有前 10 列需要浮动。不过,我不确定这是否是导致错误的原因。
  • 如果您可以在此处打印 pandas 数据框的示例输出,那么它将帮助我们解决问题

标签: python pandas dataframe pyspark spark-dataframe


【解决方案1】:

我不确定这个问题是否仍然与当前版本的 pySpark 相关,但这是我在发布这个问题几周后制定的解决方案。代码相当丑陋,可能效率低下,但由于对这个问题的持续兴趣,我将其发布在这里。:

from pyspark import SparkContext
from pyspark.sql import HiveContext
from pyspark import SparkConf
from py4j.protocol import Py4JJavaError

myConf = SparkConf(loadDefaults=True)
sc = SparkContext(conf=myConf)
hc = HiveContext(sc)


def chunks(lst, k):
    """Yield k chunks of close to equal size"""
    n = len(lst) / k
    for i in range(0, len(lst), n):
        yield lst[i: i + n]


def reconstruct_rdd(lst, num_parts):
    partitions = chunks(lst, num_parts)
    for part in range(0, num_parts - 1):
        print "Partition ", part, " started..."
        partition = next(partitions)    # partition is a list of lists
        if part == 0:
            prime_rdd = sc.parallelize(partition)
        else:
            second_rdd = sc.parallelize(partition)
            prime_rdd = prime_rdd.union(second_rdd)
        print "Partition ", part, " complete!"
    return prime_rdd


def build_col_name_list(len_cols):
    name_lst = []
    for i in range(1, len_cols):
        idx = "_" + str(i)
        name_lst.append(idx)
    return name_lst


def set_spark_df_header(header, sdf):
    oldColumns = build_col_name_lst(len(sdf.columns))
    newColumns = header
    sdf = reduce(lambda sdf, idx: sdf.withColumnRenamed(oldColumns[idx], newColumns[idx]), xrange(len(oldColumns)), sdf)
    return sdf


def convert_pdf_matrix_to_sdf(pdf, sdf_header, num_of_parts):
    try:
        sdf = hc.createDataFrame(pdf)
    except ValueError:
        lst = pdf.values.tolist()   #Need to convert to list of list to parallelize
        try:
            rdd = sc.parallelize(lst)
        except Py4JJavaError:
            rdd = reconstruct_rdd(lst, num_of_parts)
            sdf = hc.createDataFrame(rdd)
            sdf = set_spark_df_header(sdf_header, sdf)
    return sdf

【讨论】:

    【解决方案2】:

    to_sparse(fill_value=0) 基本上已经过时了。只需使用标准变体

    sqlContext.createDataFrame(pd.DataFrame(csc_mat.todense()))
    

    只要类型兼容就可以了。

    【讨论】:

    • OP 询问的是火花,而不是稀疏
    猜你喜欢
    • 2018-11-30
    • 2017-03-23
    • 2018-07-10
    • 1970-01-01
    • 2015-12-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多