【问题标题】:Structured Streaming reading from a Kafka topic从 Kafka 主题读取结构化流
【发布时间】:2018-10-28 22:05:20
【问题描述】:

我已读取一个 csv 文件并将值字段转换为字节并使用 Kafka 生产者应用程序写入 Kafka 主题。现在我正在尝试使用结构化流从 Kafka 主题中读取,但无法在值字段上应用自定义 kryo 反序列化。

谁能告诉我如何在结构化流中使用自定义反序列化?

【问题讨论】:

标签: apache-kafka spark-structured-streaming


【解决方案1】:

我遇到了类似的问题,基本上,我将 Kafka 的所有消息都放在 Protobuf 上,然后我用 UDF 解决了这个问题。

from pyspark.sql.functions import udf

def deserialization_function(message):
    #You need to add your code to deserialize your messages
    #I returned a json but you can return other structure
    json = {"x": x_deserializable,
            "y": y_deserializable,
            "w": w_deserializable,
            "z": z_deserializable,
    return json

schema = StructType() \
                    .add("x", TimestampType()) \
                    .add("y", StringType()) \
                    .add("z", StringType()) \
                    .add("w", StringType()) 

own_udf = udf(deserialization_function, schema)

stream = spark.readStream \
          .format("kafka") \
          .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
          .option("subscribe", topic) \
          .load()

query = stream \
        .select(col("value")) \
        .select((own_udf("value")).alias("value_udf")) \
        .select("value_udf.x", "value_udf.y", "value_udf.w", "value_udf.z")

【讨论】:

    猜你喜欢
    • 2020-07-25
    • 2019-08-01
    • 2019-09-19
    • 1970-01-01
    • 1970-01-01
    • 2021-04-22
    • 2019-09-20
    • 2019-09-28
    • 1970-01-01
    相关资源
    最近更新 更多