【问题标题】:How to read streaming data in XML format from Kafka?如何从 Kafka 读取 XML 格式的流数据?
【发布时间】:2018-02-10 18:38:39
【问题描述】:

我正在尝试使用 Spark 结构化流从 Kafka 主题中读取 XML 数据。

我尝试使用 Databricks spark-xml 包,但我收到一条错误消息,指出此包不支持流式读取。有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据?

我当前的代码:

df = spark \
      .readStream \
      .format("kafka") \
      .format('com.databricks.spark.xml') \
      .options(rowTag="MainElement")\
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option(subscribeType, "test") \
      .load()

错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o33.load.
: java.lang.UnsupportedOperationException: Data source com.databricks.spark.xml does not support streamed reading
        at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)

【问题讨论】:

    标签: apache-spark xml-parsing pyspark-sql spark-structured-streaming


    【解决方案1】:

    您可以使用SQL built-in 函数xpath 等从作为Kafka 消息的 的嵌套XML 结构中提取数据。

    给定一个类似的嵌套 XML

    <root>
      <ExecutionTime>20201103153839</ExecutionTime>
      <FilterClass>S</FilterClass>
      <InputData>
        <Finance>
          <HeaderSegment>
            <Version>6</Version>
            <SequenceNb>1</SequenceNb>
          </HeaderSegment>
        </Finance>
      </InputData>
    </root>
    

    然后您可以在 selectExpr 语句中使用这些 SQL 函数,如下所示:

    df.readStream.format("kafka").options(...).load()
      .selectExpr("CAST(value AS STRING) as value")
      .selectExpr(
        "xpath(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
        "xpath_long(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
        "xpath_string(value, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
        "xpath_int(value, '/CofiResults/InputData/Finance/HeaderSegment/Version/text()') as VersionAsInt")
    

    请记住,xpath 函数将返回一个 Array 字符串,而您可能会发现将值提取为 String 甚至 Long 更方便。在 Spark 3.0.1 中使用控制台接收器流应用上述代码将导致:

    +-------------------------+-------------------+---------------------+------------+
    |ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
    +-------------------------+-------------------+---------------------+------------+
    |[20201103153839]         |20201103153839     |20201103153839       |6           |
    +-------------------------+-------------------+---------------------+------------+
    

    【讨论】:

      【解决方案2】:

      看起来上述方法有效,但它没有使用传递的模式来解析 XML 文档。

      如果你打印关系模式,它总是

      INFO  XmlToAvroConverter - .convert() : XmlRelation Schema ={} root
       |-- fields: array (nullable = true)
       |    |-- element: struct (containsNull = true)
       |    |    |-- name: string (nullable = true)
       |    |    |-- nullable: boolean (nullable = true)
       |    |    |-- type: string (nullable = true)
       |-- type: string (nullable = true)
      

      例如:我正在关注来自 Kafka 主题的 XML 文档

      <?xml version="1.0" encoding="UTF-8" standalone="no"?>
      <Book>
      <Author>John Doe</Author>
      <Title>Test</Title>
      <PubishedDate></PublishedDate>
      </Book>
      

      这是我必须将 XML 解析为 DataFrame 的代码

      kafkaValueAsStringDF = kafakDF.selectExpr("CAST(key AS STRING) msgKey","CAST(value AS STRING) xmlString")
      
        var parameters = collection.mutable.Map.empty[String, String]
      
        parameters.put("rowTag", "Book")
      
      kafkaValueAsStringDF.writeStream.foreachBatch {
                (batchDF: DataFrame, batchId: Long) =>
      
       val xmlStringDF:DataFrame = batchDF.selectExpr("xmlString")
      
                  xmlStringDF.printSchema()
      
                  val rdd: RDD[String] = xmlStringDF.as[String].rdd
      
      
                  val relation = XmlRelation(
                    () => rdd,
                    None,
                    parameters.toMap,
                    xmlSchema)(spark.sqlContext)
      
      
                  logger.info(".convert() : XmlRelation Schema ={} "+relation.schema.treeString)
      
      }
              .start()
              .awaitTermination()
      

      当我从文件系统或 S3 读取相同的 XML 文档并使用 spark-xml 并按预期解析架构时。

      谢谢 萨提什

      【讨论】:

      • 请忽略架构加载的问题,它现在按预期工作。
      【解决方案3】:
      import xml.etree.ElementTree as ET
      df = spark \
            .readStream \
            .format("kafka") \
            .option("kafka.bootstrap.servers", "localhost:9092") \
            .option(subscribeType, "test") \
            .load()
      

      然后我写了一个python UDF

      def parse(s):
        xml = ET.fromstring(s)
        ns = {'real_person': 'http://people.example.com',
            'role': 'http://characters.example.com'}
        actor_el = xml.find("DNmS:actor",ns)
      
        if(actor_el ):
             actor = actor_el.text
        role_el.find('real_person:role', ns)
        if(role_el):
             role = role_el.text
        return actor+"|"+role
      

      注册这个UDF

      extractValuesFromXML = udf(parse)
      
         XML_DF= df .withColumn("mergedCol",extractroot("value"))
      
         AllCol_DF= xml_DF.withColumn("actorName", split(col("mergedCol"), "\\|").getItem(0))\
              .withColumn("Role", split(col("mergedCol"), "\\|").getItem(1))
      

      【讨论】:

        【解决方案4】:
        .format("kafka") \
        .format('com.databricks.spark.xml') \
        

        com.databricks.spark.xml 的最后一个获胜并成为流源(隐藏 Kafka 作为源)。

        换句话说,上面相当于.format('com.databricks.spark.xml')单独。

        您可能已经体验过,Databricks spark-xml 包不支持流式读取(即不能充当流式源)。该软件包不适用于流式传输。

        有什么方法可以使用结构化流从 Kafka 主题中提取 XML 数据?

        您只能使用标准函数或 UDF 自行访问和处理 XML。 Spark 2.2.0 之前的结构化流中没有对流式 XML 处理的内置支持。

        无论如何,这应该没什么大不了的。 Scala 代码如下所示。

        val input = spark.
          readStream.
          format("kafka").
          ...
          load
        
        val values = input.select('value cast "string")
        
        val extractValuesFromXML = udf { (xml: String) => ??? }
        val numbersFromXML = values.withColumn("number", extractValuesFromXML('value))
        
        // print XMLs and numbers to the stdout
        val q = numbersFromXML.
          writeStream.
          format("console").
          start
        

        另一种可能的解决方案是编写您自己的自定义流Source 来处理def getBatch(start: Option[Offset], end: Offset): DataFrame 中的XML 格式。 应该是可以工作的。

        【讨论】:

        • 谢谢你,Jacek。我编写了 UDF 来解析 XML 数据。它正在工作。我会尽快发布该 UDF。
        【解决方案5】:

        您不能以这种方式混合格式。 Kafka源加载为Row,包括值的数量,如keyvaluetopicvalue列存储payload as a binary type

        注意以下Kafka参数不能设置,Kafka source或sink会抛出异常:

        ...

        value.deserializer:值始终使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化这些值。

        解析此内容是用户的责任,不能委托给其他数据源。例如,请参阅我对How to read records in JSON format from Kafka using Structured Streaming? 的回答。

        对于 XML,您可能需要一个 UDF (UserDefinedFunction),尽管您可以先尝试 Hive XPath functions。您还应该解码二进制数据。

        【讨论】:

          猜你喜欢
          • 2023-02-01
          • 2017-09-04
          • 2021-05-25
          • 2020-03-17
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2016-11-08
          相关资源
          最近更新 更多