【问题标题】:Casting cassandra timestamp column as timeuuid将 cassandra 时间戳列转换为 timeuuid
【发布时间】:2019-09-27 05:32:09
【问题描述】:

我从Kafka 获取事件并存储到Cassandra。解析包含字段eventID, sessionID, timestamp, userIDjsonCassandra 表创建列,如下所示:

cassandra@cqlsh> CREATE TABLE mydata.events (
   ...     "event_date" date,
   ...     "eventID" text,
   ...     "userID" text,
   ...     timestamp timeuuid,
   ...     "sessionID" text,
   ...     "fullJson" text,
   ...     PRIMARY KEY ("event_date", timestamp, "sessionID")

在代码中:

case class cassandraFormat(
                       eventID: String, 
                       sessionID: String,
                       timeuuid: UUID, // timestamp as timeuuid
                       userID: String,
                       event_date: LocalDate, // YYYY-MM-dd format
                       fullJson: String // full json from Kafka
                     )

我需要将timestamp 列添加为timeuuid。由于我是从json 解析的,所以从标题中提取所有值并以这种方式创建列:

 val allJson = rdd.
            map(x => {
              implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
              //use serialization default to format a Map to JSON
              (x, Serialization.write(x))
            }).
            filter(x => x._1 isDefinedAt "header").
            map(x => (x._1("header"), x._2)).
            filter(x => (x._1 isDefinedAt "userID") &&
              (x._1 isDefinedAt "eventID") &&
              (x._1 isDefinedAt "sessionID") &&
              (x._1 isDefinedAt "timestamp").
            map(x => cassFormat(x._1("eventID").toString,
              x._1("sessionID").toString,
              com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong),
              x._1("userID").toString,
              com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(x._1("timestamp").toString.toLong),
              x._2))

这部分:

com.datastax.driver.core.utils.UUIDs.startOf(x._1("timestamp").toString.toLong)

正在生成错误

java.lang.NumberFormatException:对于输入字符串: “2019-05-09T09:00:52.553+0000”在 java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)

甚至尝试过: java.util.UUID.fromString(x._1("timestamp").toString, 也产生相同的错误。 如何正确地将timestamp 转换/转换为timeuuid 并通过火花作业插入Cassandra

【问题讨论】:

    标签: scala apache-spark cassandra timeuuid


    【解决方案1】:

    我已经用 UDF 解决了这个问题。

    import com.datastax.driver.core.utils.UUIDs
    import org.apache.spark.sql.functions.udf
     
    val toTimeuuid: java.sql.Timestamp => String = x => UUIDs.startOf(x.getTime()).toString()
    val fromTimeuuid: String => java.sql.Timestamp = x => new java.sql.Timestamp(UUIDs.unixTimestamp(java.util.UUID.fromString(x)))
     
    val toTimeuuidUDF = udf(toTimeuuid)
    val fromTimeuuidUDF = udf(fromTimeuuid)
    

    【讨论】:

      【解决方案2】:

      您有一个不是数字的字符串,并且您正尝试使用toLong 将其转换为数字。因此例外。

      查看this,您似乎可以使用此方法获得基于某个时间戳的 UUID:

      public static UUID getTimeUUID(long when)
      

      您必须将字符串解析为 DateTimeInstant,然后将该 DateTime/ Instant 的毫秒数传递给 getTimeUUID

      【讨论】:

      • 我从错误中知道该字符串无法转换为 Long.. 问题是如何在 Scala/Spark 应用程序中使用 datastax 驱动程序来做到这一点?
      【解决方案3】:

      我设法做到了,将timestamp格式转换为dateTimemillis,然后生成uuid

      val dateTimePattern = "yyyy-MM-dd'T'HH:mm:ss.SSSZ"
      val dateFormatter = DateTimeFormatter.ofPattern(dateTimePattern)
      
      val allJson = rdd.
                    map(x => {
                      implicit val formats: DefaultFormats.type = org.json4s.DefaultFormats
                      //use serialization default to format a Map to JSON
                      (x, Serialization.write(x))
                    }).
                    filter(x => x._1 isDefinedAt "header").
                    map(x => (x._1("header"), x._2)).
                    filter(x => (x._1 isDefinedAt "userID") &&
                      (x._1 isDefinedAt "eventID") &&
                      (x._1 isDefinedAt "sessionID") &&
                      (x._1 isDefinedAt "timestamp").
                    map(x => {
                      var millis: Long  = System.currentTimeMillis() // if timestamp format is invalid, put current timestamp instead
                      try {
                        val dateStr: String = x._1("timestamp").asInstanceOf[String]
                        // timestamp from event json
                        // create DateTime from Timestamp string
                        val dateTime: ZonedDateTime = ZonedDateTime.parse(dateStr, dateFormatter)
                        // create millis from DateTime
                        millis = dateTime.toInstant.toEpochMilli
                      } catch {
                        case e: Exception =>
                          e.printStackTrace()
                      }
                      // generate timeuuid
                      val uuid = new UUID(UUIDs.startOf(millis).getMostSignificantBits, random.nextLong)
                      // generate eventDate
                      val eventDate = com.datastax.driver.core.LocalDate.fromMillisSinceEpoch(millis)
                      cassFormat(x._1("eventID").toString,
                        x._1("sessionID").toString,
                        uuid,
                        x._1("userID").toString,
                        eventDate,
                        x._2)
                    })
                  allJson.saveToCassandra(CASSANDRA_KEYSPACE_NAME, CASSANDRA_EVENTS_TABLE)
              }
            })
      

      timestamp cassandra 中的列现在看起来像:58976340-7313-11e9-910d-60dce7513b94

      【讨论】:

        猜你喜欢
        • 2016-09-26
        • 2018-02-22
        • 2016-11-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-08-02
        • 1970-01-01
        • 2016-02-13
        相关资源
        最近更新 更多