【问题标题】:SnappyData streaming table error converting "timestamp" datatypesSnappyData 流表错误转换“时间戳”数据类型
【发布时间】:2016-09-24 04:17:26
【问题描述】:

我有一个从 kafka 主题读取 json 的快速流表。经过一些工作,我已经完成了这项工作,但是在尝试将 java.sql.Timestamp 值从我的 SensorData 对象映射到流表时遇到了问题。

错误发生在 org.apache.spark.sql.catalyst.CatalystTypeConverters 的第 318 行,在此方法中:

  private object StringConverter extends CatalystTypeConverter[Any, String, UTF8String] {
    override def toCatalystImpl(scalaValue: Any): UTF8String = scalaValue match {
      case str: String => UTF8String.fromString(str)
      case utf8: UTF8String => utf8
    }
    override def toScala(catalystValue: UTF8String): String =
      if (catalystValue == null) null else catalystValue.toString
    override def toScalaImpl(row: InternalRow, column: Int): String =
      row.getUTF8String(column).toString
  }

我运行了调试,代码显然在这里期待一个字符串值,但我的 sensorData 对象(和流表)传感器和收集时间是时间戳。因此它抱怨无法转换值。

下面是我的 SensorData 类,我用它来映射来自 Kafka 的传入 json 消息中的值。然后在我的自定义转换器中,我将这些值映射到我的Seq[Row] 中的toRows(...) 方法中。

class SensorData {
    var sensor_id: String = _
    var metric: String = _
    var collection_time: java.sql.Timestamp = _
    var sensor_time: java.sql.Timestamp = _
//    var collection_time: String = _
//    var sensor_time: String = _
    var value: String = _
    var year_num: Int = _
    var month_num: Int = _
    var day_num: Int = _
    var hour_num: Int = _

}

这是我的流表:

snsc.sql(s"CREATE STREAM TABLE sensor_data_stream if not exists " +
        "(sensor_id string, " +
        "metric string, " +
        "collection_time TIMESTAMP, " +
        "value VARCHAR(128), " +
        "sensor_time TIMESTAMP, " +
        "year_num integer, " +
        "month_num integer, " +
        "day_num integer, " +
        "hour_num integer  " +
        ") " +
        "using kafka_stream " +
        "options (storagelevel 'MEMORY_AND_DISK_SER_2', " +
        "rowConverter 'org.me.streaming.sensor.test.converter.SensorConverter', " +
        "zkQuorum 'localhost:2181', " +
        " groupId 'sensorConsumer',  topics 'sensorTest:01')")

现在为了解决这个问题,我将 SensorData 对象中的数据类型以及流表中的列数据类型更改为字符串: 即:

    "collection_time string, " +
    "sensor_time string, " +

因此,在更改此数据类型后,我能够成功地将数据从 Kafka 流式传输到我的目标列表。

我的问题...我对 SnappyData/Streaming 世界还很陌生,想知道这是否是一个错误(已知/未知),还是有更优雅的方法将 Timestamp 数据类型绑定到流表?

******根据响应更新********

这是我的行转换器:

class SensorConverter extends StreamToRowsConverter with Serializable {

  override def toRows(message: Any): Seq[Row] = {

  val mapper = new ObjectMapper()
  mapper.registerModule(DefaultScalaModule)    

  val sensor = mapper.readValue(message.toString(), classOf[SensorData])

    Seq(Row.fromSeq(Seq(
      sensor.sensor_id,
      sensor.metric,
      sensor.collection_time,
      sensor.value,
      sensor.sensor_time,
      sensor.year_num,
      sensor.month_num,
      sensor.day_num,
      sensor.hour_num)))
  }

}

我最初尝试转换一个 java 对象,但在解码它时遇到了问题(可能是由于我目前在升级时缺乏对 API 的了解)。我最终只是将一个 json 字符串传递给 Kafka。

我在提供的示例中看到@https://github.com/SnappyDataInc/snappy-poc/blob/master/src/main/scala/io/snappydata/adanalytics/Codec.scala 在构建我的 Seq[Row] 时,我没有使用 java.sql.Timestamp 调用正确地包装传入的时间戳值(这是一个很长的值)。我会试一试,看看是否能解决我的问题。

【问题讨论】:

    标签: types streaming snappydata


    【解决方案1】:

    这是一个示例,您可以参考将时间戳与流表一起使用。 https://github.com/SnappyDataInc/snappy-poc/blob/master/src/main/scala/io/snappydata/adanalytics/Codec.scala

    请检查 AdImpressionToRowsConverter#toRows 实施。在这种情况下,我们从 kafka 接收长值(System.currentTimeMills)并转换为 java.sql.Timestamp

    这里是时间戳类型的流表定义-
    https://github.com/SnappyDataInc/snappy-poc/blob/master/src/main/scala/io/snappydata/adanalytics/SnappySQLLogAggregatorJob.scala

    您能否提供 SensorConvertor#toRows 的实现? 您是否为 SensorData 对象使用相应的解码器?

    【讨论】:

    • 感谢 Yogesh。我已经更新了我上面的问题。我发现我的实现与您提供的示例有所不同。我会试试的。
    猜你喜欢
    • 2017-06-26
    • 1970-01-01
    • 2012-08-19
    • 2012-08-29
    • 2016-12-06
    • 2015-11-24
    • 2012-10-09
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多