【问题标题】:How to Store a Python bytestring in a Spark Dataframe如何在 Spark Dataframe 中存储 Python 字节串
【发布时间】:2017-05-12 19:26:57
【问题描述】:

我在看一个谜。我在 RDD 中有一堆长文档可用作 Python 字节串 (b"I'm a byte string")。现在我将此 RDD 转换为 DataFrame 以将其加入另一个 DataFrame。我是这样做的:

Data_RDD = Paths_RDD.map(open_paths).flatMap(split_files)

Data_schema = StructType([
    StructField("URI", StringType(), True),
    StructField("Content", StringType(), True),
])

Data_DF = sqlContext.createDataFrame(Data_RDD, schema=Data_schema)

print(Data_DF.show(5))

+--------------------+-----------+
|                 URI|    Content|
+--------------------+-----------+
|http://01storytel...|[B@10628e42|
|http://05yxgs.com...|[B@36699775|
|http://1.lhcmaima...|[B@4e569e3b|
|http://100100.ove...|[B@18ae5bab|
|http://1015theriv...|[B@5f044435|
+--------------------+-----------+
only showing top 5 rows

这些简短的"[B@10628e42" 字符串对我来说似乎毫无用处,可能是某种指针。字节串在 RDD 中仍然是“完整的”,因为我仍然可以访问它们。所以在从 RDD 到DataFrame 的转换中,问题就出现了。现在我尝试将字节串存储在其他类型的字段中,即ByteType()BinaryType()。两者都不起作用,因为这些错误消息不接受字节字符串:

TypeError: ByteType can not accept object b'some string' in type <class 'bytes'>
TypeError: BinaryType can not accept object b'some string' in type <class 'bytes'>

但它变得更加奇怪。当我设置一个小规模实验时:

ByteStrings = [b'one',b'two',b'three']
rdd_ByteStrings = sc.parallelize(ByteStrings)
print(rdd_ByteStrings.take(3))

DF2_schema = StructType([
    StructField("ByteString", StringType(), True),
])
DF_ByteStrings = sqlContext.createDataFrame(rdd_ByteStrings,schema=DF2_schema)

print(DF_ByteStrings.show())

在 StringType 列中也不允许使用小字节串。当我尝试运行它时,我收到以下错误消息:

StructType can not accept object b'one' in type <class 'bytes'>

当我尝试让 spark 推断类型时,它也会失败并显示以下消息:

TypeError: Can not infer schema for type: <class 'bytes'>

所以任何想法我可以如何将字节串存储在 DataFrame 中而不是 .decode() 它们。这是我只有在将两个DataFrames 连接在一起后才能做到的事情,因为另一个拥有解码信息。

我使用 Python 3.5 和 Spark 2.0.1

提前致谢!

【问题讨论】:

    标签: python-3.x apache-spark dataframe pyspark apache-spark-sql


    【解决方案1】:

    这不是一个谜。一步一步:

    • Spark 使用 Pyrolite 在 Python 和 Java 类型之间进行转换。
    • bytes 的 Java 类型是 byte[],相当于 Scala 中的 Array[Byte]
    • 您将列定义为StringType,因此Array[Byte] 将在存储到DataFrame 之前转换为String
    • Arrays 在 Scala 中是丑陋的 Java 工件,并且在其他问题中没有有用的 toString 方法:

      Array(192, 168, 1, 1).map(_.toByte)
      
      Array[Byte] = Array(-64, -88, 1, 1)
      
      Array(192, 168, 1, 1).map(_.toByte).toString
      
      String = [B@6c9fe061
      

      这是获取列内容的方式。

    Spark SQL 中没有直接映射到 Python bytes 的类型。我个人会加入join RDD,但如果你真的想使用DataFrames,你可以使用中间的BinaryType 表示。

    from collections import namedtuple
    
    Record = namedtuple("Record", ["url", "content"])
    
    rdd = sc.parallelize([Record("none://", b"foo"), Record("none://", b"bar")])
    df = rdd.map(lambda rec: Record(rec.url, bytearray(rec.content))).toDF()
    
    df.printSchema()
    
    root
     |-- url: string (nullable = true)
     |-- content: binary (nullable = true)
    

    它不会为您提供可以在本机 (JVM) 中使用的内容,也不会提供有意义的字符串表示:

    +-------+----------+
    |    url|   content|
    +-------+----------+
    |none://|[66 6F 6F]|
    |none://|[62 61 72]|
    +-------+----------+
    

    但无损:

    df.rdd.map(lambda row: bytes(row.content)).first()
    
    b'foo'
    

    并且可以在Python中访问udf

    from pyspark.sql.functions import udf
    from pyspark.sql import Column
    from typing import Union
    
    def decode(col: Union[str, Column], enc: str="utf-8") -> Column:
        def decode_(bs: Union[bytearray, None]) -> Union[str, None]:
            if bs is not None:
                return bytes(bs).decode(enc)
            except UnicodeDecodeError:
                pass 
        return udf(decode_)(col)
    
    df.withColumn("decoded", decode("content")).show()
    
    +-------+----------+-------+
    |    url|   content|decoded|
    +-------+----------+-------+
    |none://|[66 6F 6F]|    foo|
    |none://|[62 61 72]|    bar|
    +-------+----------+-------+
    

    【讨论】:

    • 我还必须设置 PYTHONHASHSEED 才能使 rdd 连接成为可能!
    • @user6910411,你可以回复stackoverflow.com/questions/54033132/…帖子
    • @user6910411 能否请您解释一下解码函数的语法,以便python初学者了解它在做什么。
    猜你喜欢
    • 2017-11-19
    • 2017-03-14
    • 2020-04-16
    • 2017-01-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-01-21
    • 1970-01-01
    相关资源
    最近更新 更多