【问题标题】:How to parse Json formatted Kafka message in spark streaming如何在火花流中解析 Json 格式的 Kafka 消息
【发布时间】:2016-08-13 06:03:48
【问题描述】:

我在 Kafka 上有这样的 JSON 消息:

{"id_post":"p1", "message":"blablabla"}

我想解析消息,并打印(或用于进一步计算)message 元素。 使用以下代码打印 json

val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, inputGroup, topicMap)
val postStream = kafkaStream.map(_._2)
postStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0){
    rdd.foreach(record => {
      println(record)
    }
}

但我无法获得单个元素。 我尝试了一些 JSON 解析器,但没有运气。 有什么想法吗?

更新: 不同 JSON 解析器的一些错误 这是circe解析器的代码和输出:

val parsed_record = parse(record)

和输出:

14:45:00,676 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
        at io.circe.jawn.CirceSupportParser$$anon$1$$anon$4.add(CirceSupportParser.scala:36)
        at jawn.CharBasedParser$class.parseString(CharBasedParser.scala:90)
        at jawn.StringParser.parseString(StringParser.scala:15)
        at jawn.Parser.rparse(Parser.scala:397)
        at jawn.Parser.parse(Parser.scala:338)
        at jawn.SyncParser.parse(SyncParser.scala:24)
        at jawn.SupportParser$$anonfun$parseFromString$1.apply(SupportParser.scala:15)

等等..在我使用parse(record)的那一行 看起来它无法访问和/或解析字符串 record

如果我使用 lift-json 也一样 在parse(record) 处,错误输出或多或少相同:

16:58:20,425 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:144)
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:141)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:80)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:45)
at net.liftweb.json.package$.parse(package.scala:40)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:98)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)

在 scala.collection.AbstractIterator.foreach(Ite​​rator.scala:1157)

【问题讨论】:

  • 您在使用 JSON 解析器时遇到了哪些问题?
  • 如果JSON有多个换行符\n怎么办?

标签: json scala apache-spark apache-kafka spark-streaming


【解决方案1】:

在 Scala/Apache Spark 中从 JSON 字符串中提取数据

import org.apache.spark.rdd.RDD

object JsonData extends serializable{
  def main(args: Array[String]): Unit = {

    val msg = "{ \"id_post\":\"21\",\"message\":\"blablabla\"}";
    val m1 = msgParse(msg)
    println(m1.id_post)
  }
  case class SomeClass(id_post: String, message: String) extends serializable
  def msgParse(msg: String): SomeClass = {
    import org.json4s._
    import org.json4s.native.JsonMethods._
    implicit val formats = DefaultFormats
    val m = parse(msg).extract[SomeClass]
      return m

  }

}

以下是 Maven 正派

<dependency>
    <groupId>org.json4s</groupId>
    <artifactId>json4s-native_2.10</artifactId>
    <version>3.3.0</version>
</dependency>

【讨论】:

    【解决方案2】:

    你也有同样的问题。

    但是我通过使用fastjson 解决了这个问题。

    SBT dependency : // http://mvnrepository.com/artifact/com.alibaba/fastjson libraryDependencies += "com.alibaba" % "fastjson" % "1.2.12"

    Maven dependency : <!-- http://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.12</version> </dependency>

    你可以试试。希望这会有所帮助。

    【讨论】:

      【解决方案3】:

      我解决了这个问题,所以我写在这里以备将来参考:

      依赖,依赖,依赖!

      我选择使用lift-json,但这适用于任何 JSON 解析器和/或框架。

      我使用的 SPARK 版本 (v1.4.1) 是与 scala 2.10 兼容的版本,这里是 pom.xml 的依赖项:

      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.10</artifactId>
          <version>1.4.1</version>
          <scope>provided</scope>
      </dependency>
      

      和其他一些库。我使用的是 scala 2.11 的 lift-json 版本......这是错误

      所以,对于未来的我,如果你正在阅读这个主题:与 scala 版本和依赖关系保持一致。 在 lift-json 的情况下:

              <dependency>
                  <groupId>net.liftweb</groupId>
                  <artifactId>lift-json_2.10</artifactId>
                  <version>3.0-M1</version>
              </dependency>
      

      【讨论】:

        猜你喜欢
        • 2014-10-27
        • 2022-01-08
        • 1970-01-01
        • 1970-01-01
        • 2019-09-20
        • 2016-12-24
        • 2017-05-18
        • 2017-03-27
        • 2021-11-25
        相关资源
        最近更新 更多