【问题标题】:Kotlin Iterable not supported in Apache Beam?Apache Beam 不支持 Kotlin Iterable?
【发布时间】:2019-09-18 09:51:43
【问题描述】:

Apache Beam 似乎拒绝识别 Kotlin 的 Iterable。这是一个示例代码:

@ProcessElement
fun processElement(
    @Element input: KV<String, Iterable<String>>, receiver: OutputReceiver<String>
) {
    val output = input.key + "|" + input.value.toString()
    println("output: $output")
    receiver.output(output)
}

我收到以下奇怪的错误:

java.lang.IllegalArgumentException:
   ...PrintString, @ProcessElement processElement(KV, OutputReceiver), @ProcessElement processElement(KV, OutputReceiver):
   @Element argument must have type org.apache.beam.sdk.values.KV<java.lang.String, java.lang.Iterable<? extends java.lang.String>>

果然,如果我将Iterable 替换为java.lang.Iterable,同样的代码就可以正常工作。我做错了什么?

依赖的版本:

  • kotlin-jvm:1.3.21
  • org.apache.beam:2.11.0

这是一个包含完整代码和堆栈跟踪的要点:

更新

经过一番反复试验,我发现虽然List&lt;String&gt; 会引发类似的异常,但MutableList&lt;String&gt; 确实有效:

class PrintString: DoFn<KV<String, MutableList<String>>, String>() {
    @ProcessElement
    fun processElement(
        @Element input: KV<String, MutableList<String>>, receiver: OutputReceiver<String>
    ) {
        val output = input.key + "|" + input.value.toString()
        println("output: $output")
        receiver.output(output)
    }
}

所以,这提醒了我 Kotlin 的 Immutable 集合实际上只是接口,而底层集合仍然是可变的。但是,尝试将 Iterable 替换为 MutableIterable 会继续引发错误。

更新 2

我使用上述MutableList 部署了我的 Kotlin Dataflow 作业,但作业失败:

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.ClassCastException:
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowViaIteratorsFn$WindowReiterable cannot be cast to java.util.List
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:184)
    at org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:102)

我不得不切换回使用java.lang.Iterable

【问题讨论】:

  • 这是在运行时还是在编译时?你能分享更多的堆栈跟踪吗?
  • @mkobit 堆栈跟踪已添加到 gist.github.com/marcoslin/…。谢谢

标签: kotlin google-cloud-dataflow apache-beam


【解决方案1】:

GroupByKey 之后使用ParDo 时,我也遇到了这个问题。事实证明,在编写接受GroupByKey 的结果的转换时,Iterable 泛型类型中需要@JvmWildcard 注释。

请参阅下面的人为示例,该示例读取文件并按每行的第一个字符进行分组。

class BeamPipe {
  class ConcatLines : DoFn<KV<String, Iterable<@JvmWildcard String>>, KV<String, String>>() {
    @ProcessElement
    fun processElement(@Element input: KV<String, Iterable<@JvmWildcard String>>, receiver: OutputReceiver<KV<String, String>>) {
      receiver.output(KV.of(input.key, input.value.joinToString("\n")))
    }
  }

  fun pipe(options: PipelineOptions) {
    val file =
        "testFile.txt"
    val p = Pipeline.create(options)
    p.apply(TextIO.read().from(file))
        .apply("Key lines by first character",
            WithKeys.of { line: String -> line[0].toString() }
                .withKeyType(TypeDescriptors.strings()))
        .apply("Group lines by first character", GroupByKey.create<String, String>())
        .apply("Concatenate lines", ParDo.of(ConcatLines()))
        .apply("Write to files", FileIO.writeDynamic<String, KV<String, String>>()
            .by { it.key }
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(ProcessFunction { it.value }), TextIO.sink())
            .to("whatever")
            .withNaming { key -> FileIO.Write.defaultNaming(key, ".txt") }
        )
    p.run()
  }
}

【讨论】:

    【解决方案2】:

    这看起来像是 Beam Kotlin SDK 中的一个错误。 @ProcessElement 方法的反射分析无法正常工作。您可以通过使用ProcessContext ctx 而不是使用@Element 参数来解决此问题。

    【讨论】:

    • 谢谢@mxm。我试过了,但仍然得到相同的IllegalArgumentException。在上面的要点中添加了示例代码和堆栈跟踪。与其他示例类似,MutableList 工作正常。
    • 知道了。感谢更新!我们将对此进行调查。
    【解决方案3】:

    我对 kotlin 不是很熟悉,但您似乎需要先导入 import java.lang.Iterable,然后才能在代码中使用它。

    【讨论】:

    • 谢谢,但根据我的问题,如果我进行导入,它确实有效。 Kotlin 不需要这样做,因为 Iterable 等同于 java.lang.Iterable。这个问题也体现在其他领域。例如,当使用返回Iterable(我无法设置)的GroupByKey 时,与ParDo 结合使用时会引发相同的错误。
    【解决方案4】:

    当我们从 groupbykey.create() 获取可迭代对象时,我可以知道如何解决这个问题吗?我不能像你做 javalang iterable 那样 groupbykey

    【讨论】:

    • 这不是问题的答案
    • @HardWorker 你应该用一些示例代码提出问题,其他可以轻松运行。我可以尝试进一步提供更多信息。
    • @HardWorker 刚刚注意到您遇到了类似的问题。我上面的回答可能对你有用。
    【解决方案5】:

    对于那些遇到此问题并在这里找到方法的人,我目前在 kotlin 中继续编写管道的解决方法是创建一个 Java 静态类,该类具有创建、包含和处理您的 Iterable 的函数。然后可以将结果(以不可迭代的格式)传递回 kotlin。

    【讨论】:

      猜你喜欢
      • 2017-05-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-12-07
      • 1970-01-01
      • 1970-01-01
      • 2020-04-08
      • 1970-01-01
      相关资源
      最近更新 更多