【问题标题】:NullPointerException exception when using Flink's leftOuterJoinLateral in Scala在 Scala 中使用 Flink 的 leftOuterJoinLateral 时出现 NullPointerException 异常
【发布时间】:2021-02-16 19:49:52
【问题描述】:

我正在尝试关注the documentation 并创建一个表函数来“展平”一些数据。使用joinLateral 进行展平时,表函数似乎工作正常。但是,当使用 leftOuterJoinLateral 时,我收到以下错误。我正在使用 Scala 并尝试了 Table API 和 SQL,结果相同:

原因:java.lang.NullPointerException:空结果不能存储在案例类中。

这是我的工作:

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.api.scala._
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.functions.TableFunction

object example_job{
  // Split the List[Int] into multiple rows
  class Split() extends TableFunction[Int] {
    def eval(nums: List[Int]): Unit = {
      nums.foreach(x =>
        if(x != 3) {
          collect(x)
      })
    }
  }

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.createLocalEnvironment()
    val tableEnv = StreamTableEnvironment.create(env)
    val splitMe = new Split()

    // Create some dummy data
    val events: DataStream[(String, List[Int])] = env.fromElements(("simon", List(1,2,3)), ("jessica", List(3)))
    
    val table = tableEnv.fromDataStream(events, 'name, 'numbers)
      .leftOuterJoinLateral(splitMe('numbers) as 'number)
      .select('name, 'number)
    table.toAppendStream[(String, Int)].print()
    env.execute("Flink jira ticket example")
  }
}

当我将 .leftOuterJoinLateral 更改为 .joinLateral 时,我得到了预期的结果:

(simon,1)
(simon,2)

当使用 .leftOuterJoinLateral 时,我会期待这样的结果:

(simon,1)
(simon,2)
(simon,null) // or (simon, None)
(jessica,null) // or (jessica, None)

看起来这可能是 Scala API 的错误?我想在提出罚单之前先检查这里,以防我做一些愚蠢的事情!

【问题讨论】:

    标签: scala apache-flink flink-streaming flink-sql


    【解决方案1】:

    问题在于,默认情况下,Flink 确实期望一行的所有字段都是非空的。这就是程序在看到外连接操作的null 结果时失败的原因。为了接受null 值,您需要通过

    禁用空检查
    val tableConfig = tableEnv.getConfig
    tableConfig.setNullCheck(false)
    

    或者您必须指定结果类型以容忍空值,例如指定自定义 POJO 输出类型:

    table.toAppendStream[MyOutput].print()
    

    class MyOutput(var name: String, var number: Integer) {
      def this() {
        this(null, null)
      }
    
      override def toString: String = s"($name, $number)"
    }
    

    【讨论】:

    • 由于某种原因,当更新 tableConfig.setNullCheck(false) 时,结果变为:(simon,1)(jessica,-1)(simon,2)??我认为最好的选择是从 UDTF 返回 Option[Int] 而不是不返回任何内容。
    • 那是因为Int 不能是null
    • 哦,好的,我现在明白了,谢谢。但我希望 (simon,1) (simon,2) (simon,-1) (jessica,-1) 对吗?仍然缺少 Simon 记录,还是我不了解左侧外侧连接?
    • 我认为结果是正确的,因为leftOuterJoinLateral 调用将每一行与指定表函数的结果相连接。在您的情况下,应用于(simon, List(1,2,3)) 的表函数将产生(1), (2),因此,结果是(simon, 1)(simon, 2)。对于(jessica, List(3)),表函数生成None,因此结果为(jessica, -1/null)。我想文档可能更明确,表函数结果仅与其输入行连接。
    • 谢谢,我想我现在明白了。我认为从 table 函数返回的每条记录都连接回输入行,但正如你所说,看起来整个表都被连接了。感谢您帮助我更好地理解它!
    猜你喜欢
    • 1970-01-01
    • 2014-09-05
    • 2020-05-09
    • 1970-01-01
    • 1970-01-01
    • 2018-08-08
    • 2019-12-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多