【问题标题】:Schema lost with ApacheBahir Stuctured Streaming connector on ApacheSpark streamingApache Spark 流上的 Apache Bahir Structured Streaming 连接器丢失了架构
【发布时间】:2017-02-07 16:39:15
【问题描述】:

我正在尝试将 ApacheSpark 结构化流连接到 MQTT 主题(本例中为 IBM Bluemix 上的 IBM Watson IoT Platform)。

我正在按如下方式创建结构化流:

val df = spark.readStream 
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("username","a-vy0z2s-q6s8r693hv")
    .option("password","B+UX(aWuFPvX")
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf")
    .option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")

到目前为止一切顺利,在 REPL 中我按如下方式返回了这个 df 对象:

df: org.apache.spark.sql.DataFrame = [值:字符串,时间戳: 时间戳]

我从this thread 了解到,每次连接时我都必须更改客户端 ID。所以这已经解决了,但是如果我开始使用这一行从流中读取:

val 查询 = df.writeStream。输出模式(“追加”)。
格式(“控制台”).start()

然后生成的架构如下所示:

df: org.apache.spark.sql.DataFrame = [值:字符串,时间戳:时间戳]

数据如下:

这意味着我的 JSON 流被转换为包含 JSON 表示的字符串对象流。

这是 ApacheBahir 的限制吗?

同时提供架构也无济于事,因为以下代码类似于相同的结果:

import org.apache.spark.sql.types._
val schema = StructType(
    StructField("count",LongType,true)::
    StructField("flowrate",LongType,true)::
    StructField("fluidlevel",StringType,true)::
    StructField("frequency",LongType,true)::
    StructField("hardness",LongType,true)::
    StructField("speed",LongType,true)::
    StructField("temperature",LongType,true)::
    StructField("ts",LongType,true)::
    StructField("voltage",LongType,true)::
Nil)

:paste
val df = spark.readStream
    .schema(schema)
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("username","a-vy0z2s-q6s8r693hv")
    .option("password","B+UX(a8GFPvX")
    .option("clientId","a:vy0z2s:a-vy0z2s-zfzzckrnqf4")
.option("topic", "iot-2/type/WashingMachine/id/Washer02/evt/voltage/fmt/json")
    .load("tcp://vy0z2s.messaging.internetofthings.ibmcloud.com:1883")

【问题讨论】:

    标签: scala apache-spark mqtt watson-iot apache-bahir


    【解决方案1】:

    许多DataSources,包括but not limited to MQTTStreamSource,都有固定的架构,由消息和时间戳组成。 Schema 没有丢失,只是没有被解析,这是一种预期的行为。

    如果架构是固定的并且预先知道,您应该能够使用from_json 函数:

    import org.apache.spark.sql.functions.from_json
    
    df.withColumn("value", from_json($"value", schema))
    

    【讨论】:

      【解决方案2】:

      对于解析(因为我不再使用四个“from_json”方法)我已经使用了

      导入 org.apache.spark.sql.functions.json_tuple

      和下面的代码,它也可以工作:

      df.withColumn("value",json_tuple($"value","myColumnName"))

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2017-05-09
        • 2022-01-14
        • 2015-09-20
        • 1970-01-01
        • 1970-01-01
        • 2015-08-26
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多