【发布时间】:2020-08-17 10:34:31
【问题描述】:
我从 kafka 源读取行,我想建立一个 kafka 消费者......在 spark 结构化流中 我知道如何告诉 spark 传入的行是 json 类型......我如何对 from_csv 做同样的事情?
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic2")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)")
.select(functions.from_json($"value", retailDataSchema).as("data"))
lines.printSchema()
架构是:
val retailDataSchema = new StructType()
.add("InvoiceNo", IntegerType)
.add("Quantity", IntegerType)
.add("Country", StringType)
谢谢!
输入数据如下所示:
【问题讨论】:
标签: scala apache-spark apache-kafka spark-structured-streaming