【问题标题】:Spark (2.2): deserialise Thrift records from Kafka using Structured StreamingSpark (2.2):使用结构化流从 Kafka 反序列化 Thrift 记录
【发布时间】:2018-03-31 04:04:07
【问题描述】:

我是新来的火花。我使用结构化流从 kafka 读取数据。

我可以在 Scala 中使用以下代码读取数据:

val data = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", brokers)
      .option("subscribe", topics)
      .option("startingOffsets", startingOffsets) 
      .load()

我在 value 列中的数据是 Thrift 记录。 Streaming api 以二进制格式提供数据。我看到了将数据转换为字符串或 json 的示例,但我找不到任何关于如何将数据反序列化为 Thrift 的示例。

我怎样才能做到这一点?

【问题讨论】:

    标签: scala apache-spark spark-streaming thrift


    【解决方案1】:

    我在 databricks 网站上找到了这个博客。它展示了如何利用 Spark SQL 的 API 来使用和转换来自 Apache Kafka 的复杂数据流。

    https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html

    有一节解释如何使用 UDF 来反序列化行:

    object MyDeserializerWrapper {
      val deser = new MyDeserializer
    }
    spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) => 
      MyDeserializerWrapper.deser.deserialize(topic, bytes)
    )
    
    df.selectExpr("""deserialize("topic1", value) AS message""")
    

    我正在使用 java,因此必须编写以下示例 UDF,以检查如何在 java 中调用它:

    UDF1<byte[], String> mode = new UDF1<byte[], String>() {
                @Override
                public String call(byte[] bytes) throws Exception {
                    String s = new String(bytes);
                    return "_" + s;
                }
            };
    

    现在我可以在结构化流式字数统计示例中使用这个 UDF,如下所示:

    Dataset<String> words = df
                    //converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
    //                .selectExpr("CAST(value AS STRING)")
                    .select( callUDF("mode", col("value")) )
                    .as(Encoders.STRING())
                    .flatMap(
                            new FlatMapFunction<String, String>() {
                                @Override
                                public Iterator<String> call(String x) {
                                    return Arrays.asList(x.split(" ")).iterator();
                                }
                            }, Encoders.STRING());
    

    对我来说,下一步是编写一个用于节俭反序列化的 UDF。我一完成就会发布它。

    【讨论】:

      【解决方案2】:

      好吧,这是后续解决方案。我无法发布自己的代码,但这是您可以使用的公共代码,感谢所有者/编码员。

      https://github.com/airbnb/airbnb-spark-thrift/blob/master/src/main/scala/com/airbnb/spark/thrift/

      首先,需要调用convertObject函数将array[byte]/value转为Row,我们称之为makeRow

      其次,你需要通过调用convert函数来获取你的thrift类structType/schema,让我们调用最终结果schema

      那么你需要像这样val deserializer = udf((bytes: Array[Byte]) =&gt; makeRow(bytes), schema)注册一个UDF

      注意:不能在不传递模式的情况下直接使用 makeRow,否则 Spark 会报错:Schema for type org.apache.spark.sql.Row is not supported

      那么就可以这样使用了:

      val stuff = kafkaStuff.withColumn("data", deserializer(kafkaStuff("value"))) val finalStuff = stuff.select("data.*")

      然后……你完成了! 希望这会有所帮助。

      另外感谢这篇文章Spark UDF for StructType / Row 当我之前的解决方案如此接近时,这给了我最终的想法。

      【讨论】:

        猜你喜欢
        • 2021-09-30
        • 2017-10-11
        • 1970-01-01
        • 2017-08-23
        • 2019-11-25
        • 2019-04-30
        • 2017-08-09
        • 2019-08-11
        • 2018-08-18
        相关资源
        最近更新 更多