【问题标题】:How to use Scala XML with Apache Flink?如何在 Apache Flink 中使用 Scala XML?
【发布时间】:2019-01-31 14:27:47
【问题描述】:

我正在尝试在 Flink 中使用 Scala XML 库来解析 XML,但我无法使其工作。请注意,我需要在同一处理函数中的代码上同时使用序列化和非序列化(字符串)版本。

我已经尝试了不同的解决方案,它们总是在 IntelliJ 中工作,但当我在 Flink 集群上运行它们时却不行。他们总是返回不同的java.lang.LinkageError: com/sun/org/apache/xerces/internal/jaxp/SAXParserImpl$JAXPSAXParser;我尝试了多种方法,但仍然遇到与此类似的错误。

这是我的 Flink 作业的示例:

object StreamingJob {
  import org.apache.flink.streaming.api.scala._

  val l = List(
    """<ciao>ciao</ciao>""",
  )

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    // set up kafka section excluded
    env.setParallelism(10)

    val stream = env.fromCollection(l)

    stream
      .uid("process")
      .map(new Processor)
      .print

    env.execute("Flink-TEST")
  }
}

这是我的处理功能的一个例子:

import javax.xml.parsers.{SAXParser, SAXParserFactory}
import org.apache.flink.api.common.functions.MapFunction
import scala.xml.{Elem, XML}
import scala.xml.factory.XMLLoader

class Processor extends MapFunction[String, String] {
  override def map(translatedMessage: String): String = {
    val xml = Processor.xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}
object Processor {
  val factory: SAXParserFactory = SAXParserFactory.newInstance
  val SAXParser: SAXParser = factory.newSAXParser
  val xmlLoader: XMLLoader[Elem] = XML.withSAXParser(SAXParser)
}

最后这是我的pom.xml,使用maven-shade插件制作我传递给flink的jar:

        <!-- other sections of the pom are excluded -->
        <properties>
            <flink.version>1.7.0</flink.version>
            <scala.binary.version>2.12</scala.binary.version>
            <scala.version>2.12.8</scala.version>
        </properties>
        <!-- other sections of the pom are excluded -->
    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- Scala Library, provided by Flink as well. -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-xml</artifactId>
            <version>2.9.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.12</artifactId>
            <version>1.7.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.dataformat</groupId>
            <artifactId>jackson-dataformat-yaml</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api-scala_2.12</artifactId>
            <version>11.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
        <dependency>
            <groupId>org.scala-lang.modules</groupId>
            <artifactId>scala-xml_2.12</artifactId>
            <version>1.1.1</version>
        </dependency>
    </dependencies>
        <!-- other sections of the pom are excluded -->
<build>
        <plugins>
            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.mycompany.myproj.artifactId.default.StreamingJob</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- Scala Compiler -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>build-helper-maven-plugin</artifactId>
                <version>1.7</version>
                <executions>
                    <!-- Add src/main/scala to eclipse build path -->
                    <execution>
                        <id>add-source</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>add-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/main/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                    <!-- Add src/test/scala to eclipse build path -->
                    <execution>
                        <id>add-test-source</id>
                        <phase>generate-test-sources</phase>
                        <goals>
                            <goal>add-test-source</goal>
                        </goals>
                        <configuration>
                            <sources>
                                <source>src/test/scala</source>
                            </sources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
        <!-- other sections of the pom are excluded -->

我相信这个问题在某种程度上与 Flink 在运行时使用的 SAXParser 的实现有关。我还尝试使用 @transient 注释来防止从 Flink 中持久化字段但没有成功。

但是我对到底发生了什么感到很困惑,有人知道如何防止错误以及出了什么问题吗?

【问题讨论】:

  • 您的问题可能与this有关
  • 看起来像,但是我没有看到适合我的用例的有效解决方案。
  • 排除org.apache.flink:force-shading 有什么原因吗?阴影通常有助于防止此类运行时错误
  • 因为我已经在flink下执行所以jar上不需要了。

标签: scala maven apache-flink maven-shade-plugin scala-xml


【解决方案1】:

过了一会儿,我发现它出了什么问题。

Scala XML 文档说:

在 Scala 2.11 及更高版本中,将以下内容添加到您的 build.sbt 文件的 库依赖:

"org.scala-lang.modules" %% "scala-xml" % "1.1.1"

在 Maven 中翻译成:

<dependency>
    <groupId>org.scala-lang.modules</groupId>
    <artifactId>scala-xml_2.12</artifactId>
    <version>1.1.1</version>
</dependency>

好吧,似乎不需要这种依赖关系,因为即使 Flink 1.7.2 似乎使用 Scala 2.12.8 它仍然在他的发行版中保留了 Scala XML(因此在类路径中),我相信这可能会导致哪个类出现问题实际加载,如果正确,但这可能不是链接错误的解决方案。

联动错误的解决方法其实是使用Flink自带的RichMapFunction[InputT, OutputT]

class Processor extends RichMapFunction[String, String] {
  var factory: SAXParserFactory = _
  var SAXParser: SAXParser = _
  var xmlLoader: XMLLoader[Elem] = _

  override def open(parameters: Configuration): Unit = {
    factory = SAXParserFactory.newInstance
    SAXParser = factory.newSAXParser
    xmlLoader = XML.withSAXParser(SAXParser)
  }

  override def map(translatedMessage: String): String = {
    val xml = xmlLoader.loadString(translatedMessage)
    xml.toString
  }
}

正如 JavaDoc 所说:

函数的初始化方法。

在实际调用之前调用 工作方法(如 mapjoin),因此适用于 一次设置工作。对于作为迭代一部分的函数,这 方法将在每次迭代超级步开始时被调用。

不幸的是,在这种情况下,var 的使用是首选,因为值/变量的初始化需要由 Flink 处理,以防止在运行时出现链接错误。

一些注意事项:

  • 我意识到这可能只发生在 DataStream[T] 而不是 DataSet[T]
  • Job 需要将并行度设置为大于 1 以导致多个任务管理器加载同一个类,如果在 IDE 中完成可能会很棘手,已解释 here
  • 在注意到此问题的原因后,似乎伴随对象可能不适合 Flink 使用。
  • 最后一部分可能是一个很好的说明,可以放在 Flink 的“Scala API 扩展”页面中,它还解释了 Flink 通常不支持匿名模式匹配函数来解构元组,除非使用 Flink Scala API 扩展:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/scala_api_extensions.html

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-03-06
    • 2019-05-10
    • 1970-01-01
    • 2015-10-03
    • 2021-05-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多