【问题标题】:Apache Flink JobListener does not work expectedApache Flink JobListener 无法正常工作
【发布时间】:2020-08-27 14:19:15
【问题描述】:

我在 flink 1.11.1 中写了一个 flink 批处理作业。作业成功完成后,我想做一些类似调用http服务的事情。

我添加了一个简单的作业侦听器来挂钩作业状态。问题是当 kafka sink 操作员抛出错误时,作业侦听器不会触发。我希望当我的作业失败时,它应该触发我的作业侦听器并打印失败日志。

我如何确定工作是否成功完成?

我们将不胜感激。

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerJobListener(new JobListener {
      override def onJobSubmitted(jobClient: JobClient, throwable: Throwable): Unit = {
        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }

      override def onJobExecuted(jobExecutionResult: JobExecutionResult, throwable: Throwable): Unit = {

        if (throwable == null) {
          log.info("SUCCESS")
        } else {
          log.info("FAIL")
        }
      }
    })

    env.createInput(input)
      .filter(r => Option(r.token).getOrElse("").nonEmpty)
      .addSink(kafkaProducer)

【问题讨论】:

    标签: apache-flink flink-streaming job-scheduling flink-batch


    【解决方案1】:

    如果您尝试在集群上运行作业,您可以使用您的作业 ID 在控制台中查看您的记录器消息和标准输出。请参考所附截图,

    如果您在本地集群上运行,默认 url 可能是 http://localhost:8081。

    同样,以下不是检查您的工作是否成功的正确方法。

    if (throwable == null) {
              log.info("SUCCESS")
            } else {
              log.info("FAIL")
            }
    
    

    【讨论】:

    猜你喜欢
    • 2016-12-01
    • 2017-08-15
    • 2014-02-23
    • 2016-05-28
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-13
    相关资源
    最近更新 更多