【发布时间】:2021-04-30 23:24:30
【问题描述】:
我有一个使用 Java 8、Apache Beam 2.27.0、Maven 和 Dagger 2 的现有 Apache Beam 项目。
我在 Kotlin 中迁移了这个项目:Kotlin JDK 8 with version 1.5.0。
我使用了 1.5.0 版本的 Kotlin,因为 1.4.3 的 Beam 和 Maven 插件存在问题(无法读取类:VirtualFile : Kotlin 1.4.30 Apache beam compilation error)
除了使用带有 Typedescriptor 和 lambda 表达式的原生 MapElement 或 FlatMapElement 之外,一切似乎都很好。
我的 pom.xml 文件的一部分
<properties>
<beam.version>2.27.0</beam.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>
<kotlin.version>1.5.0</kotlin.version>
<serialization.version>1.2.0</serialization.version>
<java.version>1.8</java.version>
<dagger.version>2.35.1</dagger.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
<properties>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-stdlib-jdk8</artifactId>
<version>${kotlin.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlinx</groupId>
<artifactId>kotlinx-serialization-json</artifactId>
<version>${serialization.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-redis</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test-junit</artifactId>
<version>${kotlin.version}</version>
<scope>test</scope>
</dependency>
<dependencies>
<build>
<plugins>
<plugin>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-plugin</artifactId>
<version>${kotlin.version}</version>
<executions>
<execution>
<id>kapt</id>
<goals>
<goal>kapt</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/main/kotlin</sourceDir>
</sourceDirs>
<annotationProcessorPaths>
<annotationProcessorPath>
<groupId>com.google.dagger</groupId>
<artifactId>dagger-compiler</artifactId>
<version>${dagger.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</execution>
<execution>
<id>compile</id>
<phase>process-sources</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/main/kotlin</sourceDir>
</sourceDirs>
</configuration>
</execution>
<execution>
<id>test-kapt</id>
<goals>
<goal>test-kapt</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/test/kotlin</sourceDir>
</sourceDirs>
<annotationProcessorPaths>
<annotationProcessorPath>
<groupId>com.google.dagger</groupId>
<artifactId>dagger-compiler</artifactId>
<version>${dagger.version}</version>
</annotationProcessorPath>
</annotationProcessorPaths>
</configuration>
</execution>
<execution>
<id>test-compile</id>
<goals>
<goal>test-compile</goal>
</goals>
<configuration>
<sourceDirs>
<sourceDir>src/test/kotlin</sourceDir>
<sourceDir>target/generated-sources/kapt/test</sourceDir>
</sourceDirs>
</configuration>
</execution>
</executions>
<configuration>
<compilerPlugins>
<plugin>kotlinx-serialization</plugin>
</compilerPlugins>
</configuration>
<dependencies>
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-maven-serialization</artifactId>
<version>${kotlin.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${maven-surefire-plugin.version}</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
实现 Serializable (java.io) 的对象
data class MyObject(
val field: String = ""
) : Serializable {
基本上我想用 Typedescriptor 和 lambda 执行一个 FlatMapElement(在场景后面是一个 SerializableFunction)
class MyTransform(private val redisConnectionConf: RedisConnectionConfiguration) :
PTransform<PBegin, PCollection<MyObject>>() {
override fun expand(input: PBegin): PCollection<MyObject> {
return input
.apply(RedisIO.read().withConnectionConfiguration(redisConnectionConf).withKeyPattern("my-pattern*"))
.apply(
FlatMapElements.into(of(MyObject::class.java))
.via(SerializableFunction<KV<String, String>, List<MyObject>> { toMyObjects(it) })
)
}
fun toMyObjects(entry: KV<String, String>): List<MyObject> {
val key = entry.key
val value = entry.value
val ref = object : TypeReference<List<MyObject>>() {}
return OBJECT_MAPPER.readValue(value, ref)
}
我自愿更改了代码并将部分代码放入方法“toMyObjects”中,以提供最大的元素。 “OBJECT_MAPPER”对象是 Jackson 对象映射器。
使用 Java 8 和 Beam 2.27.0,此基本代码运行良好。
对于 Kotlin,此代码不适用于以下错误:
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray (SerializableUtils.java:59)
at org.apache.beam.runners.core.construction.ParDoTranslation.translateDoFn (ParDoTranslation.java:692)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator$1.translateDoFn (PrimitiveParDoSingleFactory.java:218)
at org.apache.beam.runners.core.construction.ParDoTranslation.payloadForParDoLike (ParDoTranslation.java:814)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.payloadForParDoSingle (PrimitiveParDoSingleFactory.java:214)
at org.apache.beam.runners.dataflow.PrimitiveParDoSingleFactory$PayloadTranslator.translate (PrimitiveParDoSingleFactory.java:163)
at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:429)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:239)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
at org.apache.beam.runners.core.construction.PipelineTranslation$1.visitPrimitiveTransform (PipelineTranslation.java:87)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:587)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:59)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:933)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:196)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at myPackage.MyApp.main (MyApp.kt:44)
at sun.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:748)
Caused by: java.io.NotSerializableException: Non-serializable lambda
at mypackage.MyTransform$$Lambda$783/1784079343.writeObject (Unknown Source)
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project my-project:
An exception occured while executing the Java class. unable to serialize
DoFnWithExecutionInformation{doFn=org.apache.beam.sdk.transforms.FlatMapElements$2@23402e70,
mainOutputTag=Tag<org.apache.beam.sdk.values.PCollection.<init>:402#6929f09b03d242ca>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}: Non-serializable lambda -> [Help 1]
Beam sdk 中的 SerializableUtils.serializeToByteArray 方法发送此错误:java.io.NotSerializableException: Non-serializable lambda
MyObject 是可序列化的,而 lambda 封装在 Beam SerializableFunction(实现可序列化的函数)中。
通常在这种情况下,Beam 从 Serializable 对象中获取一个 SerializableCoder。 我不明白为什么 Beam 将 lambda 视为不可序列化。
我在 Java 中没有这种行为。
确切地说,如果我用 ParDo.of(DoFn) 替换 FlatMapElement/descriptor/lambda,这可以正常工作,但在某些情况下,为了更好的简洁性和可读性,我想使用内置的 MapElement 和 FlatMapElement lambda 表达式。
提前感谢您的帮助。
【问题讨论】:
-
我对一个小项目进行了相同的测试,并且只有所需的依赖项(Kotlin 1.5.0 和 Beam 2.27.0 with Maven),我遇到了完全相同的问题。
标签: maven kotlin java-8 apache-beam