【问题标题】:Pyspark 2.4.0, read avro from kafka with read stream - PythonPyspark 2.4.0,使用读取流从 kafka 读取 avro - Python
【发布时间】:2019-07-08 15:21:09
【问题描述】:

我正在尝试使用 PySpark 2.4.0 从 Kafka 读取 avro 消息。

spark-avro 外部模块可以提供这个解决方案来读取 avro 文件:

df = spark.read.format("avro").load("examples/src/main/resources/users.avro") 
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

但是,我需要阅读流式传输的 avro 消息。库文档建议使用 from_avro() 函数,该函数仅适用于 Scala 和 Java。

是否还有其他模块支持读取从 Kafka 流式传输的 avro 消息?

【问题讨论】:

    标签: python apache-spark pyspark apache-kafka avro


    【解决方案1】:

    您可以包含 spark-avro 包,例如使用 --packages(调整版本以匹配 spark 安装):

    bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0
    

    并提供您自己的包装器:

    from pyspark.sql.column import Column, _to_java_column 
    
    def from_avro(col, jsonFormatSchema): 
        sc = SparkContext._active_spark_context 
        avro = sc._jvm.org.apache.spark.sql.avro
        f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
        return Column(f(_to_java_column(col), jsonFormatSchema)) 
    
    
    def to_avro(col): 
        sc = SparkContext._active_spark_context 
        avro = sc._jvm.org.apache.spark.sql.avro
        f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
        return Column(f(_to_java_column(col))) 
    

    使用示例(取自the official test suite):

    from pyspark.sql.functions import col, struct
    
    
    avro_type_struct = """
    {
      "type": "record",
      "name": "struct",
      "fields": [
        {"name": "col1", "type": "long"},
        {"name": "col2", "type": "string"}
      ]
    }"""
    
    
    df = spark.range(10).select(struct(
        col("id"),
        col("id").cast("string").alias("id2")
    ).alias("struct"))
    avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))
    avro_struct_df.show(3)
    
    +----------+
    |      avro|
    +----------+
    |[00 02 30]|
    |[02 02 31]|
    |[04 02 32]|
    +----------+
    only showing top 3 rows
    
    avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)
    
    +------------------------------------------------+
    |from_avro(avro, struct<col1:bigint,col2:string>)|
    +------------------------------------------------+
    |                                          [0, 0]|
    |                                          [1, 1]|
    |                                          [2, 2]|
    +------------------------------------------------+
    only showing top 3 rows
    

    【讨论】:

    • 这里要注意我在使用 spark-submit 导入包时遇到的一个问题是 $spark-submit job.py --packages org.apache.spark:spark-avro_2.11:2.4 .0 不起作用。相反,它应该像这样写 $spark-submit --packages org.apache.spark:spark-avro_2.11:2.4.0 job.py
    • 这不适用于来自 Confluent Schema Registry 的 Avro。为此,这个答案似乎更好stackoverflow.com/a/55786881/2308683
    猜你喜欢
    • 2020-03-17
    • 2017-04-04
    • 2017-05-06
    • 2022-11-24
    • 2021-01-05
    • 1970-01-01
    • 1970-01-01
    • 2019-09-20
    • 2018-06-21
    相关资源
    最近更新 更多