【问题标题】:Flink KafkaConsumer fail to deserialise a composite avro schemaFlink Kafka Consumer 无法反序列化复合 avro 模式
【发布时间】:2021-10-20 12:36:18
【问题描述】:

我已经实施了这里建议的解决方案:Kafka consumer in flink, 所以我的代码如下所示:

Schema schema = Schema.parse("{\"type\":\"record\",\"name\":\"Envelope\" ... etc");
DataStreamSource<GenericRecord> stream = env.addSource(new FlinkKafkaConsumer<>(topic, AvroDeserializationSchema.forGeneric(schema), getKafkaCredentials()).setStartFromLatest());

但我一直收到错误消息 ... Caused by: java.lang.ArrayIndexOutOfBoundsException: -53,和 我尝试了不同的主题/模式对,我总是会得到相同的错误,但数字不同(-4 而不是 -53 等等)

可能是什么问题?有什么建议可以解决这个问题吗?


我看到的问题与Kafka consumer in flink 中的问题之间的唯一区别是我的架构有点复杂,这是我的架构的简化版本:

  "type": "record",
  "name": "Envelope",
  "namespace": "org.example.avro",
  "fields": [
    {
      "name": "somethingChanged",
      "type": "string"
    },
    {
      "name": "ts",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "old",
      "type": [
        "null",
        {
          "type": "record",
          "name": "OldValue",
          "fields": [
            {
              "name": "id",
              "type": [
                "null",
                "int"
              ],
              "default": null
            },
            {
              "name": "last_updated",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ]
        }
      ],
      "default": null
    },
    {
      "name": "data",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": [
                "null",
                "int"
              ],
              "default": null
            },
            {
              "name": "last_updated",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ]
        }
      ],
      "default": null
    }
  ]
}

来自 pom 文件

   <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.1</flink.version>
        <avro.version>1.10.0</avro.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

---- 编辑:共享一个实际工作的 pom 文件----

实施接受的答案后,我不得不修改 pom 文件。但是为了分享一个有用的工作 pom,我从头开始创建了一个新的纯 Flink 应用程序,下面是一个工作的 pom 文件:

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements.  See the NOTICE file
distributed with this work for additional information
regarding copyright ownership.  The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License.  You may obtain a copy of the License at

  http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.  See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>simpleFlink2</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.12.1</flink.version>
        <avro.version>1.10.0</avro.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.7</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
            <scope>runtime</scope>
        </dependency>

        <!-- FlinkKafkaConsumer -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- AvroDeserializationSchema -->
<!--        <dependency>-->
<!--            <groupId>org.apache.flink</groupId>-->
<!--            <artifactId>flink-avro</artifactId>-->
<!--            <version>${flink.version}</version>-->
<!--        </dependency>-->

        <!-- ConfluentRegistryAvroDeserializationSchema -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro-confluent-registry</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- to support for avro generator maven plugin -->
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro-maven-plugin</artifactId>
            <version>${avro.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target>
                </configuration>
            </plugin>

            <!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. -->
            <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <!-- Run shade goal on package phase -->
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>org.apache.flink:force-shading</exclude>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>org.example.StreamingJob</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- need to generate pojo from avsc files-->
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <stringType>String</stringType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

        <pluginManagement>
            <plugins>

                <!-- This improves the out-of-the-box experience in Eclipse by resolving some warnings. -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>

    <!-- This profile helps to make things run out of the box in IntelliJ -->
    <!-- Its adds Flink's core classes to the runtime class path. -->
    <!-- Otherwise they are missing in IntelliJ, because the dependency is 'provided' -->
    <profiles>
        <profile>
            <id>add-dependencies-for-IDEA</id>

            <activation>
                <property>
                    <name>idea.version</name>
                </property>
            </activation>

            <dependencies>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-java</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>

                <!-- to avoid "No ExecutorFactory found to execute the application" -->
                <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-clients_${scala.binary.version}</artifactId>
                    <version>${flink.version}</version>
                    <scope>compile</scope>
                </dependency>

            </dependencies>
        </profile>
    </profiles>

</project>

【问题讨论】:

  • AvroDeserializationSchema.forGeneric(schema) 不适用于 Confluent 序列化的 Avro 数据。你应该使用ConfluentRegistryAvroDeserializationSchema + 见stackoverflow.com/questions/58849635/…
  • @OneCricketeer 感谢您指出这一点,确实应该添加到 Flink 的官方文档中。除此之外,我的问题是 pom 文件中有混合版本。您能否将您的评论发布为答案,以便我将其标记为已接受的答案。
  • FWIW,您应该避免使用 log4j 1.2.17,因为它存在安全问题,并使用不同的日志提供程序,例如 logback 或 log4j2。您还可以将 Scala 升级到至少 2.12

标签: java apache-kafka apache-flink avro confluent-schema-registry


【解决方案1】:

当您选择 Kafka 反序列化器格式时,您需要了解数据是如何产生的。

Confluent 有线格式与普通 Avro 不同,由于解析器不同,您可能会遇到这种越界错误。

看看ConfluentRegistryAvroDeserializationSchema类是否效果更好

参考 - https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/avro-confluent/

【讨论】:

  • 除此之外,我还必须修复 pom 文件中的版本组合,并删除主目录中的 .m2 文件夹。然后我再次尝试使用 AvroDeserializationSchema,但没有成功。切换回 ConfluentRegistryAvroDeserializationSchema 就像你建议的那样工作,很可能是因为我们的设置由 confluent kafka 和 confluent Schema 注册表组成。
  • 注意:我编辑了问题以包含一个工作 pom 文件。
  • 注意:没有“Confluent Kafka”之类的东西。您可以将 Schema Registry 与任何 Kafka 集群一起使用,但它始终是“Apache Kafka”
猜你喜欢
  • 2021-05-09
  • 2020-08-17
  • 2020-12-13
  • 2017-12-14
  • 2019-08-03
  • 2019-11-18
  • 2019-07-30
  • 2015-08-01
  • 1970-01-01
相关资源
最近更新 更多