我在 databricks 网站上找到了这个博客。它展示了如何利用 Spark SQL 的 API 来使用和转换来自 Apache Kafka 的复杂数据流。
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
有一节解释如何使用 UDF 来反序列化行:
object MyDeserializerWrapper {
val deser = new MyDeserializer
}
spark.udf.register("deserialize", (topic: String, bytes: Array[Byte]) =>
MyDeserializerWrapper.deser.deserialize(topic, bytes)
)
df.selectExpr("""deserialize("topic1", value) AS message""")
我正在使用 java,因此必须编写以下示例 UDF,以检查如何在 java 中调用它:
UDF1<byte[], String> mode = new UDF1<byte[], String>() {
@Override
public String call(byte[] bytes) throws Exception {
String s = new String(bytes);
return "_" + s;
}
};
现在我可以在结构化流式字数统计示例中使用这个 UDF,如下所示:
Dataset<String> words = df
//converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
// .selectExpr("CAST(value AS STRING)")
.select( callUDF("mode", col("value")) )
.as(Encoders.STRING())
.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(x.split(" ")).iterator();
}
}, Encoders.STRING());
对我来说,下一步是编写一个用于节俭反序列化的 UDF。我一完成就会发布它。