【问题标题】:FlatMapElement Kotlin Beam non Serializable lambdaFlatMapElement Kotlin Beam 不可序列化 lambda
【发布时间】:2021-04-30 23:24:30
【问题描述】:

我有一个使用 Java 8、Apache Beam 2.27.0、Maven 和 Dagger 2 的现有 Apache Beam 项目。

我在 Kotlin 中迁移了这个项目:Kotlin JDK 8 with version 1.5.0。

我使用了 1.5.0 版本的 Kotlin,因为 1.4.3 的 Beam 和 Maven 插件存在问题(无法读取类:VirtualFile : Kotlin 1.4.30 Apache beam compilation error

除了使用带有 Typedescriptor 和 lambda 表达式的原生 MapElement 或 FlatMapElement 之外,一切似乎都很好。

我的 pom.xml 文件的一部分

<properties>
        <beam.version>2.27.0</beam.version>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kotlin.code.style>official</kotlin.code.style>
        <kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
        <kotlin.compiler.incremental>true</kotlin.compiler.incremental>

        <kotlin.version>1.5.0</kotlin.version>
        <serialization.version>1.2.0</serialization.version>
        <java.version>1.8</java.version>
        
        <dagger.version>2.35.1</dagger.version>
        <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
        <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
        <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<properties>


<dependencies>
       <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib-jdk8</artifactId>
            <version>${kotlin.version}</version>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlinx</groupId>
            <artifactId>kotlinx-serialization-json</artifactId>
            <version>${serialization.version}</version>
        </dependency>
        
        <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
                <version>${beam.version}</version>
                <scope>runtime</scope>
            </dependency>

            <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-sdks-java-core</artifactId>
                <version>${beam.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
                <version>${beam.version}</version>
            </dependency>

            <dependency>
                <groupId>org.apache.beam</groupId>
                <artifactId>beam-sdks-java-io-redis</artifactId>
                <version>${beam.version}</version>
            </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-test-junit</artifactId>
            <version>${kotlin.version}</version>
            <scope>test</scope>
        </dependency>
<dependencies>

<build>
        <plugins>

           <plugin>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-maven-plugin</artifactId>
                <version>${kotlin.version}</version>
                <executions>
                    <execution>
                        <id>kapt</id>
                        <goals>
                            <goal>kapt</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>src/main/kotlin</sourceDir>
                            </sourceDirs>
                            <annotationProcessorPaths>
                                <annotationProcessorPath>
                                    <groupId>com.google.dagger</groupId>
                                    <artifactId>dagger-compiler</artifactId>
                                    <version>${dagger.version}</version>
                                </annotationProcessorPath>
                            </annotationProcessorPaths>
                        </configuration>
                    </execution>
                    <execution>
                        <id>compile</id>
                        <phase>process-sources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>src/main/kotlin</sourceDir>
                            </sourceDirs>
                        </configuration>
                    </execution>

                    <execution>
                        <id>test-kapt</id>
                        <goals>
                            <goal>test-kapt</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>src/test/kotlin</sourceDir>
                            </sourceDirs>
                            <annotationProcessorPaths>
                                <annotationProcessorPath>
                                    <groupId>com.google.dagger</groupId>
                                    <artifactId>dagger-compiler</artifactId>
                                    <version>${dagger.version}</version>
                                </annotationProcessorPath>
                            </annotationProcessorPaths>
                        </configuration>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <goals>
                            <goal>test-compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>src/test/kotlin</sourceDir>
                                <sourceDir>target/generated-sources/kapt/test</sourceDir>
                            </sourceDirs>
                        </configuration>
                    </execution>
                </executions>
                <configuration>
                    <compilerPlugins>
                        <plugin>kotlinx-serialization</plugin>
                    </compilerPlugins>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.jetbrains.kotlin</groupId>
                        <artifactId>kotlin-maven-serialization</artifactId>
                        <version>${kotlin.version}</version>
                    </dependency>
                </dependencies>
            </plugin>
            
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>${maven-surefire-plugin.version}</version>
                <dependencies>
                    <dependency>
                        <groupId>org.apache.maven.surefire</groupId>
                        <artifactId>surefire-junit47</artifactId>
                        <version>${maven-surefire-plugin.version}</version>
                    </dependency>
                </dependencies>
            </plugin>

            <plugin>
                <groupId>org.codehaus.mojo</groupId>
                <artifactId>exec-maven-plugin</artifactId>
                <version>${maven-exec-plugin.version}</version>
                <configuration>
                    <cleanupDaemonThreads>false</cleanupDaemonThreads>
                </configuration>
            </plugin>
 </plugins>

实现 Serializable (java.io) 的对象

data class MyObject(
    val field: String = ""
) : Serializable {

基本上我想用 Typedescriptor 和 lambda 执行一个 FlatMapElement(在场景后面是一个 SerializableFunction)

class MyTransform(private val redisConnectionConf: RedisConnectionConfiguration) :
    PTransform<PBegin, PCollection<MyObject>>() {

    override fun expand(input: PBegin): PCollection<MyObject> {
        return input
            .apply(RedisIO.read().withConnectionConfiguration(redisConnectionConf).withKeyPattern("my-pattern*"))
            .apply(
                FlatMapElements.into(of(MyObject::class.java))
                    .via(SerializableFunction<KV<String, String>, List<MyObject>> { toMyObjects(it) })
            )
    }

fun toMyObjects(entry: KV<String, String>): List<MyObject> {
        val key = entry.key
        val value = entry.value
        
        val ref = object : TypeReference<List<MyObject>>() {}
        return OBJECT_MAPPER.readValue(value, ref)
    }

我自愿更改了代码并将部分代码放入方法“toMyObjects”中,以提供最大的元素。 “OBJECT_MAPPER”对象是 Jackson 对象映射器。

使用 Java 8 和 Beam 2.27.0,此基本代码运行良好。

对于 Kotlin,此代码不适用于以下错误:

at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:59)
    at org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn (ParDoTranslation.java:692)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn (PrimitiveParDoSingleFactory.java:218)
    at org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike (ParDoTranslation.java:814)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle (PrimitiveParDoSingleFactory.java:214)
    at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate (PrimitiveParDoSingleFactory.java:163)
    at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:429)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:239)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform (PipelineTranslation.java:87)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:587)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:59)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:933)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:196)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at myPackage.MyApp.main (MyApp.kt:44)
    at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:748)
Caused by: java.io.NotSerializableException: Non-serializable lambda
    at mypackage.MyTransform$$Lambda$783/1784079343.writeObject (Unknown Source)


[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project my-project: 
An exception occured while executing the Java class. unable to serialize 
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.FlatMapElements$2@23402e70,
mainOutputTag=Tag<org.apache.beam.sdk.values.PCollection.<init>:402#6929f09b03d242ca>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}: Non-serializable lambda -> [Help 1]

Beam sdk 中的 SerializableUtils.serializeToByteArray 方法发送此错误:java.io.NotSerializableException: Non-serializable lambda

MyObject 是可序列化的,而 lambda 封装在 Beam SerializableFunction(实现可序列化的函数)中。

通常在这种情况下,Beam 从 Serializable 对象中获取一个 SerializableCoder。 我不明白为什么 Beam 将 lambda 视为不可序列化。

我在 Java 中没有这种行为。

确切地说,如果我用 ParDo.of(DoFn) 替换 FlatMapElement/descriptor/lambda,这可以正常工作,但在某些情况下,为了更好的简洁性和可读性,我想使用内置的 MapElement 和 FlatMapElement lambda 表达式。

提前感谢您的帮助。

【问题讨论】:

  • 我对一个小项目进行了相同的测试,并且只有所需的依赖项(Kotlin 1.5.0 和 Beam 2.27.0 with Maven),我遇到了完全相同的问题。

标签: maven kotlin java-8 apache-beam


【解决方案1】:

我终于找到了解决方案,我将 Kotlin 版本(依赖项 + 插件)降级到 1.4.21。

在这种情况下,Lambda non Serializable 的问题消失了,kotlin Maven 插件在编译时没有虚拟文件问题:Kotlin 1.4.30 Apache beam compilation error

这个话题对我帮助很大,谢谢:https://youtrack.jetbrains.com/issue/KT-45067

如果 Kotlin maven 插件在大于 1.4.21 的版本 1.4.x 上正常工作,也许将来会很棒。

使用 Kotlin 和 Maven 的 Beam 开发人员必须小心处理此问题,1.4.32 无法使用 Beam 进行编译,而 1.5.0 在运行时会出现 Lambda 不可序列化的问题。

【讨论】:

    【解决方案2】:

    如果您想使用 Kotlin 1.5,请尝试以下解决方法: use -Xsam-conversions=class

    <plugins>
      <plugin>
        <groupId>org.jetbrains.kotlin</groupId>
        <artifactId>kotlin-maven-plugin</artifactId>
        <version>${kotlin.version}</version>
        <configuration>
          <args>
            <arg>-Xsam-conversions=class</arg>
          </args>
        </configuration>
      </plugin>
    </plugins>
    

    参考:https://youtrack.jetbrains.com/issue/KT-46359#focus=Comments-27-4862857.0-0

    【讨论】:

    • 非常感谢您的回答,这对我来说很宝贵??我明天要测试它。
    【解决方案3】:

    当我用一个实现 SerializableFunction 函数的类替换 lambda 时,这可行

    class MapString : SerializableFunction<KV<String, String>, List<MyObject>> {
            override fun apply(input: KV<String, String>): List<MyObject> {
                ....
            }
        }
    

    我将问题保持打开状态,因为我想要一个使用 lambda 表达式的解决方案。

    【讨论】:

    • 我的印象是 Kotlin lambda 在我的配置中不会像 Java lambda 那样被解释。在 pom 中,我使用了 kotlin jdk8 和 kotlin jvm 目标到 1.8。
    猜你喜欢
    • 1970-01-01
    • 2019-03-10
    • 2021-07-28
    • 1970-01-01
    • 1970-01-01
    • 2019-09-18
    • 1970-01-01
    • 1970-01-01
    • 2021-02-01
    相关资源
    最近更新 更多